# This patch completes the robustness and deadlock avoidance work by # handling the writing of the bitmap. The existing method of dealing # with low-memory situations by having an emergency structure for use # when memory can't be allocated won't work here because of the # variable size of the bitmap buffer, and the unknown (to me) limit of # a contiguous I/O request. # The allocation is avoided by writing directly from the bitmap rather # than allocating a buffer and copying the relevant chunk of the # bitmap into it. # This has a number of consequences. First, since the bitmap is # written directly from the device's bitmap, bits should not be set in # it until the I/O is just about to start. This is because reads # would see that and possibly race with the outgoing writes, returning # data from a section of the COW file which has never been written. # To prevent this, reads that overlap a pending bitmap write are # stalled until the write is finished. Modifying the bitmap bits as # late as possible shrinks the window in which this could happen. # Second, a section of bitmap that's being written out should not be # modified again until the write has finished. Otherwise, a bit might # be set and picked up by a pending I/O, resulting in it being on disk # too soon. # So, there are a couple new lists. Bitmap writes which have been # issued, but not finished are on the pending_bitmaps list. Any # subsequent bitmap writes which overlap a pending write have to # wait. These are put on the waiting_bitmaps list. Whenever a # pending bitmap write finishes, any overlapping waiting writes are # tried. They may continue waiting because they overlap an earlier # waiter, but at least one will proceed. Third, reads which overlap a # pending or waiting bitmap write will wait until those writes have # finished. This is done by do_io returning -EAGAIN, causing the # queue to wait until some requests have finished. Index: linux-2.6.17/arch/um/drivers/ubd_kern.c =================================================================== --- linux-2.6.17.orig/arch/um/drivers/ubd_kern.c 2007-11-19 11:24:01.000000000 -0500 +++ linux-2.6.17/arch/um/drivers/ubd_kern.c 2007-11-19 11:24:43.000000000 -0500 @@ -70,19 +70,6 @@ struct io_thread_req { int error; }; -extern int open_ubd_file(char *file, struct openflags *openflags, int shared, - char **backing_file_out, int *bitmap_offset_out, - unsigned long *bitmap_len_out, int *data_offset_out, - int *create_cow_out); -extern int create_cow_file(char *cow_file, char *backing_file, - struct openflags flags, int sectorsize, - int alignment, int *bitmap_offset_out, - unsigned long *bitmap_len_out, - int *data_offset_out); -extern int read_cow_bitmap(int fd, void *buf, int offset, int len); -static int do_io(struct io_thread_req *req, struct request *r, - unsigned long *bitmap); - static inline int ubd_test_bit(__u64 bit, void *data) { unsigned char *buffer = data; @@ -168,6 +155,7 @@ struct ubd { unsigned no_cow:1; struct cow cow; struct platform_device pdev; + struct list_head bitmap_list; struct request_queue *queue; spinlock_t lock; struct scatterlist sg[MAX_SG]; @@ -202,6 +190,19 @@ struct ubd { /* Protected by ubd_lock */ struct ubd ubd_devs[MAX_DEV] = { [ 0 ... MAX_DEV - 1 ] = DEFAULT_UBD }; +extern int open_ubd_file(char *file, struct openflags *openflags, int shared, + char **backing_file_out, int *bitmap_offset_out, + unsigned long *bitmap_len_out, int *data_offset_out, + int *create_cow_out); +extern int create_cow_file(char *cow_file, char *backing_file, + struct openflags flags, int sectorsize, + int alignment, int *bitmap_offset_out, + unsigned long *bitmap_len_out, + int *data_offset_out); +extern int read_cow_bitmap(int fd, void *buf, int offset, int len); +static int do_io(struct io_thread_req *req, struct request *r, + struct ubd *dev); + /* Only changed by fake_ide_setup which is a setup */ static int fake_ide = 0; static struct proc_dir_entry *proc_ide_root = NULL; @@ -501,16 +502,23 @@ static void ubd_finish(struct request *r } struct bitmap_io { + struct list_head list; + unsigned long long start; + unsigned long long end; + struct ubd_aio *aio_ptr; atomic_t count; struct aio_context aio; }; struct ubd_aio { struct aio_context aio; + struct ubd *dev; struct request *req; int len; struct bitmap_io *bitmap; void *bitmap_buf; + unsigned long long bitmap_start; + unsigned long long bitmap_end; }; /* @@ -587,9 +595,129 @@ static void free_ubd_aio(struct ubd_aio else kfree(aio); } +/* This protects both pending_bitmaps and waiting_bitmaps */ +static DEFINE_SPINLOCK(bitmaps_lock); +static LIST_HEAD(pending_bitmaps); +static LIST_HEAD(waiting_bitmaps); + +static struct bitmap_io *delete_bitmap_io(struct ubd_aio *aio) +{ + struct list_head *entry; + struct bitmap_io *ret = NULL; + unsigned long flags; + + spin_lock_irqsave(&bitmaps_lock, flags); + list_for_each(entry, &pending_bitmaps){ + struct bitmap_io *b = list_entry(entry, struct bitmap_io, + list); + if(b->aio_ptr == aio){ + list_del(entry); + ret = b; + goto out; + } + } + +out: + spin_unlock_irqrestore(&bitmaps_lock, flags); + return ret; +} + +#define BETWEEN(p, s, e) (((p) >= (s)) && ((p) < (e))) + +#define OVERLAP(r1s, r1e, r2s, r2e) \ + (BETWEEN(r1s, r2s, r2e) || BETWEEN(r2s, r1s, r1e)) + +/* + * Synchronization of bitmap writes - + * The bitmap I/O is done directly from the bitmap in the COW structure. + * This means that a bit can't be modified while a range containing it is in + * flight on the host. So, all pending bitmap writes are put on the + * pending_bitmaps list. bitmap start checks for any overlaps between the + * bitmap write it is starting and the writes on this list. If there aren't + * any, the bitmap goes out. It there are, then that bitmap is put on the + * waiting_bitmaps list. Any completion of a bitmap I/O will cause this list + * to be walked, and any waiting, overlapping writes will be attempted. They + * may still wait, because of an overlap with a previous write on the list. + * Since the bitmap is only updated just before the write is sent to the + * host, any reads which overlap a pending or waiting write wait for the + * write to finish before being submitted to the host. This check happens in + * do_io, which returns -EAGAIN if the read has to wait. This causes the queue + * to wait until some I/O requests from it have finished, using the mechanism + * introduced to deal with memory shortages. + */ + +/* The bitmaps_lock must already be taken */ +static int bitmap_waits(struct bitmap_io *bitmap) +{ + struct list_head *entry; + + list_for_each(entry, &pending_bitmaps){ + struct bitmap_io *b = list_entry(entry, struct bitmap_io, + list); + if(OVERLAP(b->start, b->end, bitmap->start, bitmap->end)){ + if(list_empty(&bitmap->list)) + list_add_tail(&bitmap->list, &waiting_bitmaps); + return 1; + } + } + return 0; +} + static DEFINE_SPINLOCK(restart_lock); static LIST_HEAD(restart); +/* The bitmaps_lock must already be taken */ +static void bitmap_start(struct ubd_aio *aio) +{ + struct ubd *dev; + int i; + + if(bitmap_waits(aio->bitmap)) + return; + + for(i = aio->bitmap_start; i < aio->bitmap_end; i++) + ubd_set_bit(i, aio->bitmap_buf); + + aio->aio = aio->bitmap->aio; + aio->len = 1 << 9; + aio->bitmap->aio_ptr = aio; + + dev = aio->dev; + if(submit_aio(&aio->aio) == -EAGAIN){ + if(!list_empty(&aio->bitmap->list)) + list_del_init(&aio->bitmap->list); + list_add_tail(&aio->bitmap->list, &dev->bitmap_list); + if(list_empty(&dev->restart)){ + unsigned long flags; + + spin_lock_irqsave(&restart_lock, flags); + list_add(&dev->restart, &restart); + spin_unlock_irqrestore(&restart_lock, flags); + } + } + else { + if(!list_empty(&aio->bitmap->list)) + list_del_init(&aio->bitmap->list); + list_add(&aio->bitmap->list, &pending_bitmaps); + aio->bitmap = NULL; + } +} + +static void bitmap_unwait(struct bitmap_io *bitmap) +{ + struct list_head *entry, *next; + unsigned long flags; + + spin_lock_irqsave(&bitmaps_lock, flags); + list_for_each_safe(entry, next, &waiting_bitmaps){ + struct bitmap_io *b = list_entry(entry, struct bitmap_io, + list); + if(OVERLAP(b->start, b->end, bitmap->start, bitmap->end)) + bitmap_start(b->aio_ptr); + } + spin_unlock_irqrestore(&bitmaps_lock, flags); +} + /* Called without dev->lock held, and only in interrupt context. */ static void ubd_intr(struct aio_context *context) { @@ -612,21 +740,24 @@ static void ubd_intr(struct aio_context if((aio->bitmap != NULL) && (atomic_dec_and_test(&aio->bitmap->count))){ - aio->aio = aio->bitmap->aio; - aio->len = 0; - free_bitmap_io(aio->bitmap); - aio->bitmap = NULL; - submit_aio(&aio->aio); + spin_lock_irqsave(&bitmaps_lock, flags); + bitmap_start(aio); + spin_unlock_irqrestore(&bitmaps_lock, flags); } else { + struct bitmap_io *b; + if((req->nr_sectors == 0) && (aio->bitmap == NULL)){ int len = req->hard_nr_sectors << 9; ubd_finish(req, len); } - if(aio->bitmap_buf != NULL) - kfree(aio->bitmap_buf); + b = delete_bitmap_io(aio); + if(b != NULL){ + bitmap_unwait(b); + free_bitmap_io(b); + } free_ubd_aio(aio); } } @@ -634,8 +765,6 @@ static void ubd_intr(struct aio_context ubd_finish(req, n); if(aio->bitmap != NULL) free_bitmap_io(aio->bitmap); - if(aio->bitmap_buf != NULL) - kfree(aio->bitmap_buf); free_ubd_aio(aio); } @@ -803,6 +932,7 @@ static int ubd_add(int n, char **error_o ubd_dev->size = ROUND_BLOCK(ubd_dev->size); INIT_LIST_HEAD(&ubd_dev->restart); + INIT_LIST_HEAD(&ubd_dev->bitmap_list); sg_init_table(ubd_dev->sg, MAX_SG); err = -ENOMEM; @@ -1079,8 +1209,6 @@ static void cowify_bitmap(struct io_thre if(req->bitmap_start == -1) req->bitmap_start = sector + i; req->bitmap_end = sector + i + 1; - - ubd_set_bit(sector + i, bitmap); } } @@ -1119,8 +1247,27 @@ static void do_ubd_request(struct reques struct io_thread_req *io_req; struct request *req; struct ubd *dev = q->queuedata; + struct list_head *next, *entry; int last_sectors; + list_for_each_safe(entry, next, &dev->bitmap_list){ + struct bitmap_io *bitmap = list_entry(entry, struct bitmap_io, + list); + struct ubd_aio *aio = bitmap->aio_ptr; + unsigned long flags; + + if(submit_aio(&aio->aio) == -EAGAIN) + goto out_again; + + list_del_init(&bitmap->list); + + spin_lock_irqsave(&bitmaps_lock, flags); + list_add(&bitmap->list, &pending_bitmaps); + spin_unlock_irqrestore(&bitmaps_lock, flags); + + aio->bitmap = NULL; + } + while(1){ struct ubd *dev = q->queuedata; if(dev->end_sg == 0){ @@ -1393,47 +1540,65 @@ int create_cow_file(char *cow_file, char return err; } +static int read_overlaps(int start, int end) +{ + struct list_head *entry; + unsigned long flags; + int ret = 0; + + spin_lock_irqsave(&bitmaps_lock, flags); + list_for_each(entry, &pending_bitmaps){ + struct bitmap_io *b; + + b = list_entry(entry, struct bitmap_io, list); + if(OVERLAP(b->start, b->end, start, end)){ + ret = 1; + goto out; + } + } + + list_for_each(entry, &waiting_bitmaps){ + struct bitmap_io *b; + + b = list_entry(entry, struct bitmap_io, list); + if(OVERLAP(b->start, b->end, start, end)){ + ret = 1; + goto out; + } + } + +out: + spin_unlock_irqrestore(&bitmaps_lock, flags); + return ret; +} + static int do_io(struct io_thread_req *req, struct request *r, - unsigned long *bitmap) + struct ubd *dev) { struct ubd_aio *aio; struct bitmap_io *bitmap_io = NULL; - char *buf; - void *bitmap_buf = NULL; - unsigned long len, sector; - int nsectors, start, end, bit; - int err; + char *buf, *bits; + unsigned long len, sector, start, *bitmap = dev->cow.bitmap; + int nsectors, end, bit, err; __u64 off; if(req->bitmap_start != -1){ - /* Round up to the nearest word */ - int round = sizeof(unsigned long); - bitmap_io = alloc_bitmap_io(); if(IS_ERR(bitmap_io)) return PTR_ERR(bitmap_io); - len = (req->bitmap_end - req->bitmap_start + - round * 8 - 1) / (round * 8); - len *= round; - - off = req->bitmap_start / (8 * round); - off *= round; - - bitmap_buf = kmalloc(len, GFP_KERNEL); - if(bitmap_buf == NULL){ - printk("do_io : kmalloc of bitmap chunk (len %ld)" - "failed\n", len); - free_bitmap_io(bitmap_io); - req->error = 1; - return -ENOMEM; - } - memcpy(bitmap_buf, &bitmap[off / sizeof(bitmap[0])], len); + off = req->bitmap_start / 8; + len = (req->bitmap_end - 8 * off + 8 - 1) / 8; + bits = &((char *) bitmap)[off]; *bitmap_io = ((struct bitmap_io) - { .count = ATOMIC_INIT(0), + { .list = LIST_HEAD_INIT(bitmap_io->list), + .aio_ptr = NULL, + .start = off, + .end = off + len, + .count = ATOMIC_INIT(0), .aio = INIT_AIO(AIO_WRITE, req->fds[1], - bitmap_buf, len, + bits, len, req->bitmap_offset + off, &ubd_aio_driver) } ); } @@ -1442,14 +1607,21 @@ static int do_io(struct io_thread_req *r start = 0; end = nsectors; bit = 0; + sector = req->offset / req->sectorsize; do { if(bitmap != NULL){ - sector = req->offset / req->sectorsize; - bit = ubd_test_bit(sector + start, bitmap); - end = start; - while((end < nsectors) && - (ubd_test_bit(sector + end, bitmap) == bit)) - end++; + if(req->op == AIO_READ){ + bit = ubd_test_bit(sector + start, bitmap); + end = start; + while((end < nsectors) && + (ubd_test_bit(sector + end, bitmap) == + bit)) + end++; + + if((bit == 0) && read_overlaps(start, end)) + return -EAGAIN; + } + else bit = 1; } off = req->offsets[bit] + req->offset + @@ -1462,15 +1634,20 @@ static int do_io(struct io_thread_req *r return PTR_ERR(aio); *aio = ((struct ubd_aio) - { .aio = INIT_AIO(req->op, req->fds[bit], buf, + { .dev = dev, + .aio = INIT_AIO(req->op, req->fds[bit], buf, len, off, &ubd_aio_driver), .len = len, .req = r, .bitmap = bitmap_io, - .bitmap_buf = bitmap_buf }); + .bitmap_buf = bitmap, + .bitmap_start = sector + start, + .bitmap_end = sector + end }); - if(aio->bitmap != NULL) + if(aio->bitmap != NULL){ atomic_inc(&aio->bitmap->count); + aio->bitmap->aio_ptr = aio; + } err = submit_aio(&aio->aio); if(err){ @@ -1479,9 +1656,17 @@ static int do_io(struct io_thread_req *r "err = %d\n", err); req->error = 1; } + else { + if(aio->bitmap != NULL) + free_bitmap_io(aio->bitmap); + free_ubd_aio(aio); + } return err; } + if(aio->bitmap != NULL) + r->nr_sectors++; + start = end; } while(start < nsectors);