# This patch completes the robustness and deadlock avoidance work by
# handling the writing of the bitmap.  The existing method of dealing
# with low-memory situations by having an emergency structure for use
# when memory can't be allocated won't work here because of the
# variable size of the bitmap buffer, and the unknown (to me) limit of
# a contiguous I/O request.
# The allocation is avoided by writing directly from the bitmap rather
# than allocating a buffer and copying the relevant chunk of the
# bitmap into it.
# This has a number of consequences.  First, since the bitmap is
# written directly from the device's bitmap, bits should not be set in
# it until the I/O is just about to start.  This is because reads
# would see that and possibly race with the outgoing writes, returning
# data from a section of the COW file which has never been written.
# To prevent this, reads that overlap a pending bitmap write are
# stalled until the write is finished.  Modifying the bitmap bits as
# late as possible shrinks the window in which this could happen.
# Second, a section of bitmap that's being written out should not be
# modified again until the write has finished.  Otherwise, a bit might
# be set and picked up by a pending I/O, resulting in it being on disk
# too soon.
# So, there are a couple new lists.  Bitmap writes which have been
# issued, but not finished are on the pending_bitmaps list.  Any
# subsequent bitmap writes which overlap a pending write have to
# wait.  These are put on the waiting_bitmaps list.  Whenever a
# pending bitmap write finishes, any overlapping waiting writes are
# tried.  They may continue waiting because they overlap an earlier
# waiter, but at least one will proceed.  Third, reads which overlap a
# pending or waiting bitmap write will wait until those writes have
# finished.  This is done by do_io returning -EAGAIN, causing the
# queue to wait until some requests have finished.

Index: linux-2.6.17/arch/um/drivers/ubd_kern.c
===================================================================
--- linux-2.6.17.orig/arch/um/drivers/ubd_kern.c	2007-11-19 11:24:01.000000000 -0500
+++ linux-2.6.17/arch/um/drivers/ubd_kern.c	2007-11-19 11:24:43.000000000 -0500
@@ -70,19 +70,6 @@ struct io_thread_req {
 	int error;
 };
 
-extern int open_ubd_file(char *file, struct openflags *openflags, int shared,
-			 char **backing_file_out, int *bitmap_offset_out,
-			 unsigned long *bitmap_len_out, int *data_offset_out,
-			 int *create_cow_out);
-extern int create_cow_file(char *cow_file, char *backing_file,
-			   struct openflags flags, int sectorsize,
-			   int alignment, int *bitmap_offset_out,
-			   unsigned long *bitmap_len_out,
-			   int *data_offset_out);
-extern int read_cow_bitmap(int fd, void *buf, int offset, int len);
-static int do_io(struct io_thread_req *req, struct request *r,
-		 unsigned long *bitmap);
-
 static inline int ubd_test_bit(__u64 bit, void *data)
 {
 	unsigned char *buffer = data;
@@ -168,6 +155,7 @@ struct ubd {
 	unsigned no_cow:1;
 	struct cow cow;
 	struct platform_device pdev;
+	struct list_head bitmap_list;
 	struct request_queue *queue;
 	spinlock_t lock;
 	struct scatterlist sg[MAX_SG];
@@ -202,6 +190,19 @@ struct ubd {
 /* Protected by ubd_lock */
 struct ubd ubd_devs[MAX_DEV] = { [ 0 ... MAX_DEV - 1 ] = DEFAULT_UBD };
 
+extern int open_ubd_file(char *file, struct openflags *openflags, int shared,
+			 char **backing_file_out, int *bitmap_offset_out,
+			 unsigned long *bitmap_len_out, int *data_offset_out,
+			 int *create_cow_out);
+extern int create_cow_file(char *cow_file, char *backing_file,
+			   struct openflags flags, int sectorsize,
+			   int alignment, int *bitmap_offset_out,
+			   unsigned long *bitmap_len_out,
+			   int *data_offset_out);
+extern int read_cow_bitmap(int fd, void *buf, int offset, int len);
+static int do_io(struct io_thread_req *req, struct request *r,
+		 struct ubd *dev);
+
 /* Only changed by fake_ide_setup which is a setup */
 static int fake_ide = 0;
 static struct proc_dir_entry *proc_ide_root = NULL;
@@ -501,16 +502,23 @@ static void ubd_finish(struct request *r
 }
 
 struct bitmap_io {
+	struct list_head list;
+	unsigned long long start;
+	unsigned long long end;
+	struct ubd_aio *aio_ptr;
 	atomic_t count;
 	struct aio_context aio;
 };
 
 struct ubd_aio {
 	struct aio_context aio;
+	struct ubd *dev;
 	struct request *req;
 	int len;
 	struct bitmap_io *bitmap;
 	void *bitmap_buf;
+	unsigned long long bitmap_start;
+	unsigned long long bitmap_end;
 };
 
 /*
@@ -587,9 +595,129 @@ static void free_ubd_aio(struct ubd_aio 
 	else kfree(aio);
 }
 
+/* This protects both pending_bitmaps and waiting_bitmaps */
+static DEFINE_SPINLOCK(bitmaps_lock);
+static LIST_HEAD(pending_bitmaps);
+static LIST_HEAD(waiting_bitmaps);
+
+static struct bitmap_io *delete_bitmap_io(struct ubd_aio *aio)
+{
+	struct list_head *entry;
+	struct bitmap_io *ret = NULL;
+	unsigned long flags;
+
+	spin_lock_irqsave(&bitmaps_lock, flags);
+	list_for_each(entry, &pending_bitmaps){
+		struct bitmap_io *b = list_entry(entry, struct bitmap_io,
+						 list);
+		if(b->aio_ptr == aio){
+			list_del(entry);
+			ret = b;
+			goto out;
+		}
+	}
+
+out:
+	spin_unlock_irqrestore(&bitmaps_lock, flags);
+	return ret;
+}
+
+#define BETWEEN(p, s, e) (((p) >= (s)) && ((p) < (e)))
+
+#define OVERLAP(r1s, r1e, r2s, r2e) \
+	(BETWEEN(r1s, r2s, r2e) || BETWEEN(r2s, r1s, r1e))
+
+/*
+ * Synchronization of bitmap writes -
+ *     The bitmap I/O is done directly from the bitmap in the COW structure.
+ * This means that a bit can't be modified while a range containing it is in
+ * flight on the host.  So, all pending bitmap writes are put on the
+ * pending_bitmaps list.  bitmap start checks for any overlaps between the
+ * bitmap write it is starting and the writes on this list.  If there aren't
+ * any, the bitmap goes out.  It there are, then that bitmap is put on the
+ * waiting_bitmaps list.  Any completion of a bitmap I/O will cause this list
+ * to be walked, and any waiting, overlapping writes will be attempted.  They
+ * may still wait, because of an overlap with a previous write on the list.
+ *     Since the bitmap is only updated just before the write is sent to the
+ * host, any reads which overlap a pending or waiting write wait for the
+ * write to finish before being submitted to the host.  This check happens in
+ * do_io, which returns -EAGAIN if the read has to wait.  This causes the queue
+ * to wait until some I/O requests from it have finished, using the mechanism
+ * introduced to deal with memory shortages.
+ */
+
+/* The bitmaps_lock must already be taken */
+static int bitmap_waits(struct bitmap_io *bitmap)
+{
+	struct list_head *entry;
+
+	list_for_each(entry, &pending_bitmaps){
+		struct bitmap_io *b = list_entry(entry, struct bitmap_io,
+						 list);
+		if(OVERLAP(b->start, b->end, bitmap->start, bitmap->end)){
+			if(list_empty(&bitmap->list))
+				list_add_tail(&bitmap->list, &waiting_bitmaps);
+			return 1;
+		}
+	}
+	return 0;
+}
+
 static DEFINE_SPINLOCK(restart_lock);
 static LIST_HEAD(restart);
 
+/* The bitmaps_lock must already be taken */
+static void bitmap_start(struct ubd_aio *aio)
+{
+	struct ubd *dev;
+	int i;
+
+	if(bitmap_waits(aio->bitmap))
+		return;
+
+	for(i = aio->bitmap_start; i < aio->bitmap_end; i++)
+		ubd_set_bit(i, aio->bitmap_buf);
+
+	aio->aio = aio->bitmap->aio;
+	aio->len = 1 << 9;
+	aio->bitmap->aio_ptr = aio;
+
+	dev = aio->dev;
+	if(submit_aio(&aio->aio) == -EAGAIN){
+		if(!list_empty(&aio->bitmap->list))
+			list_del_init(&aio->bitmap->list);
+		list_add_tail(&aio->bitmap->list, &dev->bitmap_list);
+		if(list_empty(&dev->restart)){
+			unsigned long flags;
+
+			spin_lock_irqsave(&restart_lock, flags);
+			list_add(&dev->restart, &restart);
+			spin_unlock_irqrestore(&restart_lock, flags);
+		}
+	}
+	else {
+		if(!list_empty(&aio->bitmap->list))
+			list_del_init(&aio->bitmap->list);
+		list_add(&aio->bitmap->list, &pending_bitmaps);
+		aio->bitmap = NULL;
+	}
+}
+
+static void bitmap_unwait(struct bitmap_io *bitmap)
+{
+	struct list_head *entry, *next;
+	unsigned long flags;
+
+	spin_lock_irqsave(&bitmaps_lock, flags);
+	list_for_each_safe(entry, next, &waiting_bitmaps){
+		struct bitmap_io *b = list_entry(entry, struct bitmap_io,
+						 list);
+		if(OVERLAP(b->start, b->end, bitmap->start, bitmap->end))
+			bitmap_start(b->aio_ptr);
+	}
+	spin_unlock_irqrestore(&bitmaps_lock, flags);
+}
+
 /* Called without dev->lock held, and only in interrupt context. */
 static void ubd_intr(struct aio_context *context)
 {
@@ -612,21 +740,24 @@ static void ubd_intr(struct aio_context 
 
 			if((aio->bitmap != NULL) &&
 			   (atomic_dec_and_test(&aio->bitmap->count))){
-				aio->aio = aio->bitmap->aio;
-				aio->len = 0;
-				free_bitmap_io(aio->bitmap);
-				aio->bitmap = NULL;
-				submit_aio(&aio->aio);
+				spin_lock_irqsave(&bitmaps_lock, flags);
+				bitmap_start(aio);
+				spin_unlock_irqrestore(&bitmaps_lock, flags);
 			}
 			else {
+				struct bitmap_io *b;
+
 				if((req->nr_sectors == 0) &&
 				   (aio->bitmap == NULL)){
 					int len = req->hard_nr_sectors << 9;
 					ubd_finish(req, len);
 				}
 
-				if(aio->bitmap_buf != NULL)
-					kfree(aio->bitmap_buf);
+				b = delete_bitmap_io(aio);
+				if(b != NULL){
+					bitmap_unwait(b);
+					free_bitmap_io(b);
+				}
 				free_ubd_aio(aio);
 			}
 		}
@@ -634,8 +765,6 @@ static void ubd_intr(struct aio_context 
 			ubd_finish(req, n);
 			if(aio->bitmap != NULL)
 				free_bitmap_io(aio->bitmap);
-			if(aio->bitmap_buf != NULL)
-				kfree(aio->bitmap_buf);
 			free_ubd_aio(aio);
 		}
 
@@ -803,6 +932,7 @@ static int ubd_add(int n, char **error_o
 	ubd_dev->size = ROUND_BLOCK(ubd_dev->size);
 
 	INIT_LIST_HEAD(&ubd_dev->restart);
+	INIT_LIST_HEAD(&ubd_dev->bitmap_list);
 	sg_init_table(ubd_dev->sg, MAX_SG);
 
 	err = -ENOMEM;
@@ -1079,8 +1209,6 @@ static void cowify_bitmap(struct io_thre
 		if(req->bitmap_start == -1)
 			req->bitmap_start = sector + i;
 		req->bitmap_end = sector + i + 1;
-
-		ubd_set_bit(sector + i, bitmap);
 	}
 }
 
@@ -1119,8 +1247,27 @@ static void do_ubd_request(struct reques
 	struct io_thread_req *io_req;
 	struct request *req;
 	struct ubd *dev = q->queuedata;
+	struct list_head *next, *entry;
 	int last_sectors;
 
+	list_for_each_safe(entry, next, &dev->bitmap_list){
+		struct bitmap_io *bitmap = list_entry(entry, struct bitmap_io,
+						      list);
+		struct ubd_aio *aio = bitmap->aio_ptr;
+		unsigned long flags;
+
+		if(submit_aio(&aio->aio) == -EAGAIN)
+			goto out_again;
+
+		list_del_init(&bitmap->list);
+
+		spin_lock_irqsave(&bitmaps_lock, flags);
+		list_add(&bitmap->list, &pending_bitmaps);
+		spin_unlock_irqrestore(&bitmaps_lock, flags);
+
+ 		aio->bitmap = NULL;
+	}
+
 	while(1){
 		struct ubd *dev = q->queuedata;
 		if(dev->end_sg == 0){
@@ -1393,47 +1540,65 @@ int create_cow_file(char *cow_file, char
 	return err;
 }
 
+static int read_overlaps(int start, int end)
+{
+	struct list_head *entry;
+	unsigned long flags;
+	int ret = 0;
+
+	spin_lock_irqsave(&bitmaps_lock, flags);
+	list_for_each(entry, &pending_bitmaps){
+		struct bitmap_io *b;
+
+		b = list_entry(entry, struct bitmap_io, list);
+		if(OVERLAP(b->start, b->end, start, end)){
+			ret = 1;
+			goto out;
+		}
+	}
+
+	list_for_each(entry, &waiting_bitmaps){
+		struct bitmap_io *b;
+
+		b = list_entry(entry, struct bitmap_io, list);
+		if(OVERLAP(b->start, b->end, start, end)){
+			ret = 1;
+			goto out;
+		}
+	}
+
+out:
+	spin_unlock_irqrestore(&bitmaps_lock, flags);
+	return ret;
+}
+
 static int do_io(struct io_thread_req *req, struct request *r,
-		 unsigned long *bitmap)
+		 struct ubd *dev)
 {
 	struct ubd_aio *aio;
 	struct bitmap_io *bitmap_io = NULL;
-	char *buf;
-	void *bitmap_buf = NULL;
-	unsigned long len, sector;
-	int nsectors, start, end, bit;
-	int err;
+	char *buf, *bits;
+	unsigned long len, sector, start, *bitmap = dev->cow.bitmap;
+	int nsectors, end, bit, err;
 	__u64 off;
 
 	if(req->bitmap_start != -1){
-		/* Round up to the nearest word */
-		int round = sizeof(unsigned long);
-
 		bitmap_io = alloc_bitmap_io();
 		if(IS_ERR(bitmap_io))
 			return PTR_ERR(bitmap_io);
 
-		len = (req->bitmap_end - req->bitmap_start +
-		       round * 8 - 1) / (round * 8);
-		len *= round;
-
-		off = req->bitmap_start / (8 * round);
-		off *= round;
-
-		bitmap_buf = kmalloc(len, GFP_KERNEL);
-		if(bitmap_buf == NULL){
-			printk("do_io : kmalloc of bitmap chunk (len %ld)"
-			       "failed\n", len);
-			free_bitmap_io(bitmap_io);
-			req->error = 1;
-			return -ENOMEM;
-		}
-		memcpy(bitmap_buf, &bitmap[off / sizeof(bitmap[0])], len);
+		off = req->bitmap_start / 8;
+		len = (req->bitmap_end - 8 * off  + 8 - 1) / 8;
+		bits = &((char *) bitmap)[off];
 
 		*bitmap_io = ((struct bitmap_io)
-			{ .count	= ATOMIC_INIT(0),
+			{ .list = 	LIST_HEAD_INIT(bitmap_io->list),
+			  .aio_ptr =	NULL,
+			  .start =	off,
+			  .end =	off + len,
+			  .count	= ATOMIC_INIT(0),
 			  .aio		= INIT_AIO(AIO_WRITE, req->fds[1],
-						   bitmap_buf, len,
+						   bits, len,
 						   req->bitmap_offset + off,
 						   &ubd_aio_driver) } );
 	}
@@ -1442,14 +1607,21 @@ static int do_io(struct io_thread_req *r
 	start = 0;
 	end = nsectors;
 	bit = 0;
+	sector = req->offset / req->sectorsize;
 	do {
 		if(bitmap != NULL){
-			sector = req->offset / req->sectorsize;
-			bit = ubd_test_bit(sector + start, bitmap);
-			end = start;
-			while((end < nsectors) &&
-			      (ubd_test_bit(sector + end, bitmap) == bit))
-				end++;
+			if(req->op == AIO_READ){
+				bit = ubd_test_bit(sector + start, bitmap);
+				end = start;
+				while((end < nsectors) &&
+				      (ubd_test_bit(sector + end, bitmap) ==
+				       bit))
+ 					end++;
+
+				if((bit == 0) && read_overlaps(start, end))
+					return -EAGAIN;
+			}
+			else bit = 1;
 		}
 
 		off = req->offsets[bit] + req->offset +
@@ -1462,15 +1634,20 @@ static int do_io(struct io_thread_req *r
 			return PTR_ERR(aio);
 
 		*aio = ((struct ubd_aio)
-			{ .aio		= INIT_AIO(req->op, req->fds[bit], buf,
+			{ .dev 		= dev,
+			  .aio		= INIT_AIO(req->op, req->fds[bit], buf,
 						   len, off, &ubd_aio_driver),
 			  .len		= len,
 			  .req		= r,
 			  .bitmap	= bitmap_io,
-			  .bitmap_buf 	= bitmap_buf });
+			  .bitmap_buf 	= bitmap,
+			  .bitmap_start	= sector + start,
+			  .bitmap_end	= sector + end });
 
-		if(aio->bitmap != NULL)
+		if(aio->bitmap != NULL){
 			atomic_inc(&aio->bitmap->count);
+			aio->bitmap->aio_ptr = aio;
+		}
 
 		err = submit_aio(&aio->aio);
 		if(err){
@@ -1479,9 +1656,17 @@ static int do_io(struct io_thread_req *r
 				       "err = %d\n", err);
 				req->error = 1;
 			}
+			else {
+				if(aio->bitmap != NULL)
+					free_bitmap_io(aio->bitmap);
+				free_ubd_aio(aio);
+			}
 			return err;
 		}
 
+		if(aio->bitmap != NULL)
+			r->nr_sectors++;
+
 		start = end;
 	} while(start < nsectors);