MPIPE update (patch01)

Matthew Dillon dillon at apollo.backplane.com
Fri Mar 19 14:55:03 PST 2004


    I've revamped the MPIPE allocator code somewhat to provide a wider range
    of services to the kernel.  My intention is that we will soon be using
    it to replace the mbuf allocator and possibly also zalloc().  The
    MPIPE API is very similar to the FreeBSD-5 uma_zalloc() API (which
    replaced zalloc() in FreeBSD-4), except that it provides a better-
    managed abstraction.

    Basically the MPIPE abstraction is able to guarentee a certain number
    of nominal outstanding allocations, and gives you the choice of blocking
    or non-blocking allocations beyond that, up to a maximum (which can be
    set to be virtually unlimited if no maximum is desired).  So, for example,
    the ATA code needs to be guarenteed at least one outstanding request be
    allocatable in order to 'chain' restarts from the last operation's 
    interrupt completion.  The ATA code in FreeBSD can potentially stall if
    malloc() ever returned NULL (even though it purportedly can handle NULL),
    but the ATA code in DragonFly is guarenteed by design to never stall.

    Most, or all of the allocations done in the I/O path used by the pageout
    code is going to have to eventually be converted to use MPIPE instead of
    zalloc() or malloc() in order to be able to guarentee continuing operation
    in low memory situations.

    Here is a beta-patch (only somewhat tested).  The final will go into
    the tree in 3-days.

					-Matt
					Matthew Dillon 
					<dillon at xxxxxxxxxxxxx>

Index: sys/mpipe.h
===================================================================
RCS file: /cvs/src/sys/sys/mpipe.h,v
retrieving revision 1.1
diff -u -r1.1 mpipe.h
--- sys/mpipe.h	30 Nov 2003 20:13:53 -0000	1.1
+++ sys/mpipe.h	19 Mar 2004 19:52:00 -0000
@@ -34,32 +34,62 @@
 #endif
 
 /*
- * Pipeline memory allocations.  This implements a pipeline for allocations
- * of a particular size.  It is used in order to allow memory allocations
- * to block while at the same time guarenteeing that no deadlocks will occur.
+ * Pipeline memory allocations with persistent store capabilities.  This
+ * implements a pipeline for allocations of a particular size.  It is used
+ * in order to allow memory allocations to block while at the same time
+ * guarenteeing that no deadlocks will occur.
+ *
+ * By default new allocations are zero'd out. 
+ *
+ * MPF_NOZERO		If specified the underlying buffers are not zero'd.
+ *			Note this also means you have no way of knowing which
+ *			buffers are coming from the cache and which are new
+ *			allocations.
+ *
+ * MPF_CACHEDATA	If specified the deconstructor will be called when
+ *			the underlying buffer is free()'d, but the buffer may
+ *			be reused many times before/if that happens.  The
+ *			buffer is NOT zero'd on reuse regardless of the
+ *			MPF_NOZERO flag.
+ *
+ *			If not specified and MPF_NOZERO is also not specified,
+ *			then buffers reused from the cache will be zero'd as
+ *			well as new allocations.
+ *
+ *			Note that the deconstructor function may still be NULL
+ *			if this flag is specified, meaning that you don't need
+ *			notification when the cached contents is physically
+ *			free()'d.
  */
 struct mpipe_buf;
 
 struct malloc_pipe {
-    TAILQ_HEAD(, mpipe_buf) queue;
     malloc_type_t type;		/* malloc bucket */
     int		bytes;		/* allocation size */
+    int		mpflags;	/* MPF_ flags */
+    int		mflags;		/* M_ flags (used internally) */
     int		pending;	/* there is a request pending */
-    int		free_count;	/* entries in free list */
-    int		total_count;	/* total free + outstanding */
-    int		max_count;	/* maximum cache size */
-    void	(*trigger)(void *data);	/* trigger function on free */
-    void	*trigger_data;
+    int		free_count;	/* entries in array[] */
+    int		total_count;	/* total outstanding allocations incl free */
+    int		ary_count;	/* guarenteed allocation count */
+    int		max_count;	/* maximum count (M_NOWAIT used beyond nom) */
+    void	**array;	/* array[ary_count] */
+    void	(*deconstruct)(struct malloc_pipe *, void *buf);
 };
 
+#define MPF_CACHEDATA		0x0001	/* cache old buffers (do not zero) */ 
+#define MPF_NOZERO		0x0002	/* do not zero-out new allocations */
+
 typedef struct malloc_pipe *malloc_pipe_t;
 
 #ifdef _KERNEL
 
 void mpipe_init(malloc_pipe_t mpipe, malloc_type_t type,
-		int bytes, int nnow, int nmax);
+		int bytes, int nnom, int nmax, 
+		int mpflags, void (*deconstruct)(struct malloc_pipe *, void *));
 void mpipe_done(malloc_pipe_t mpipe);
-void *mpipe_alloc(malloc_pipe_t mpipe, int flags);
+void *mpipe_alloc_waitok(malloc_pipe_t mpipe);
+void *mpipe_alloc_nowait(malloc_pipe_t mpipe);
 void mpipe_free(malloc_pipe_t mpipe, void *vbuf);
 
 #endif
Index: kern/kern_mpipe.c
===================================================================
RCS file: /cvs/src/sys/kern/kern_mpipe.c,v
retrieving revision 1.2
diff -u -r1.2 kern_mpipe.c
--- kern/kern_mpipe.c	20 Jan 2004 05:04:06 -0000	1.2
+++ kern/kern_mpipe.c	19 Mar 2004 19:57:12 -0000
@@ -41,127 +41,189 @@
 
 #define arysize(ary)	(sizeof(ary)/sizeof((ary)[0]))
 
-typedef struct mpipe_buf {
-	TAILQ_ENTRY(mpipe_buf)	entry;
-} *mpipe_buf_t;
+static MALLOC_DEFINE(M_MPIPEARY, "MPipe Array", "Auxillary MPIPE structure");
 
 /*
  * Initialize a malloc pipeline for the specified malloc type and allocation
- * size, and immediately allocate nnow buffers and set the nominal maximum
- * to nmax.
+ * size.  Create an array to cache up to nom_count buffers and preallocate
+ * them.
  */
 void
 mpipe_init(malloc_pipe_t mpipe, malloc_type_t type, int bytes,
-	int nnow, int nmax)
+	int nnom, int nmax, 
+	int mpflags, void (*deconstruct)(struct malloc_pipe *, void *))
 {
-    if (bytes < sizeof(struct mpipe_buf))
-	bytes = sizeof(struct mpipe_buf);
+    int n;
+
+    if (nnom < 1)
+	nnom = 1;
+    if (nmax < 0)
+	nmax = 0x7FFF0000;	/* some very large number */
+    if (nmax < nnom)
+	nmax = nnom;
     bzero(mpipe, sizeof(struct malloc_pipe));
-    TAILQ_INIT(&mpipe->queue);
     mpipe->type = type;
     mpipe->bytes = bytes;
+    mpipe->mpflags = mpflags;
+    mpipe->deconstruct = deconstruct;
+    if ((mpflags & MPF_NOZERO) == 0)
+	mpipe->mflags |= M_ZERO;
+    mpipe->ary_count = nnom;
     mpipe->max_count = nmax;
-    if (nnow > 0) {
-	void *buf;
+    mpipe->array = malloc(nnom * sizeof(mpipe->array[0]), M_MPIPEARY, 
+			    M_WAITOK | M_ZERO);
 
-	buf = malloc(bytes, mpipe->type, M_WAITOK);
-	KKASSERT(buf != NULL);
+    while (mpipe->free_count < nnom) {
+	n = mpipe->free_count;
+	mpipe->array[n] = malloc(bytes, mpipe->type, M_WAITOK | mpipe->mflags);
+	++mpipe->free_count;
 	++mpipe->total_count;
-	mpipe_free(mpipe, buf);
-	while (--nnow > 0) {
-	    buf = malloc(bytes, mpipe->type, M_SYSNOWAIT);
-	    if (buf == NULL)
-		break;
-	    ++mpipe->total_count;
-	    mpipe_free(mpipe, buf);
-	}
     }
-    if (mpipe->max_count < mpipe->total_count)
-	mpipe->max_count = mpipe->total_count;
 }
 
 void
 mpipe_done(malloc_pipe_t mpipe)
 {
-    struct mpipe_buf *buf;
+    void *buf;
+    int n;
 
-    KKASSERT(mpipe->free_count == mpipe->total_count);
-    while (mpipe->free_count) {
-	buf = TAILQ_FIRST(&mpipe->queue);
+    KKASSERT(mpipe->free_count == mpipe->total_count);	/* no outstanding mem */
+    while (--mpipe->free_count >= 0) {
+	n = mpipe->free_count;
+	buf = mpipe->array[n];
+	mpipe->array[n] = NULL;
 	KKASSERT(buf != NULL);
-	TAILQ_REMOVE(&mpipe->queue, buf, entry);
-	--mpipe->free_count;
 	--mpipe->total_count;
+	if (mpipe->deconstruct)
+	    mpipe->deconstruct(mpipe, buf);
 	free(buf, mpipe->type);
     }
-    KKASSERT(TAILQ_EMPTY(&mpipe->queue));
 }
 
 /*
- * Allocate an entry.  flags can be M_RNOWAIT which tells us not to block.
- * Unlike a normal malloc, if we block in mpipe_alloc() no deadlock will occur
- * because it will unblock the moment an existing in-use buffer is freed.
+ * Allocate an entry, nominally non-blocking.  The allocation is guarenteed
+ * to return non-NULL up to the nominal count after which it may return NULL.
+ * Note that the implementation is defined to be allowed to block for short
+ * periods of time.  Use mpipe_alloc_waitok() to guarentee the allocation.
  */
 void *
-mpipe_alloc(malloc_pipe_t mpipe, int flags)
+mpipe_alloc_nowait(malloc_pipe_t mpipe)
 {
-    mpipe_buf_t buf;
+    void *buf;
+    int n;
 
     crit_enter();
-    while (mpipe->free_count == 0) {
-	if (mpipe->total_count < mpipe->max_count) {
+    if ((n = mpipe->free_count) != 0) {
+	/*
+	 * Use a free entry if it exists.
+	 */
+	--n;
+	buf = mpipe->array[n];
+	mpipe->array[n] = NULL;	/* sanity check, not absolutely needed */
+	mpipe->free_count = n;
+    } else if (mpipe->total_count >= mpipe->max_count) {
+	/*
+	 * Return NULL if we have hit our limit
+	 */
+	buf = NULL;
+    } else {
+	/*
+	 * Otherwise try to malloc() non-blocking.
+	 */
+	buf = malloc(mpipe->bytes, mpipe->type, M_NOWAIT | mpipe->mflags);
+	if (buf)
 	    ++mpipe->total_count;
-	    if ((buf = malloc(mpipe->bytes, mpipe->type, flags)) != NULL) {
-		crit_exit();
-		return(buf);
-	    }
-	    --mpipe->total_count;
-	} else if (flags & M_RNOWAIT) {
-	    crit_exit();
-	    return(NULL);
-	} else {
+    }
+    crit_exit();
+    return(buf);
+}
+
+/*
+ * Allocate an entry, block until the allocation succeeds.  This may cause
+ * us to block waiting for a prior allocation to be freed.
+ */
+void *
+mpipe_alloc_waitok(malloc_pipe_t mpipe)
+{
+    void *buf;
+    int n;
+    int mfailed;
+
+    crit_enter();
+    mfailed = 0;
+    for (;;) {
+	if ((n = mpipe->free_count) != 0) {
+	    /*
+	     * Use a free entry if it exists.
+	     */
+	    --n;
+	    buf = mpipe->array[n];
+	    mpipe->array[n] = NULL;
+	    mpipe->free_count = n;
+	    break;
+	}
+	if (mpipe->total_count >= mpipe->max_count || mfailed) {
+	    /*
+	     * Block if we have hit our limit
+	     */
 	    mpipe->pending = 1;
-	    tsleep(mpipe, 0, "mpipe", 0);
+	    tsleep(mpipe, 0, "mpipe1", 0);
+	    continue;
 	}
+	/*
+	 * Otherwise try to malloc() non-blocking.  If that fails loop to
+	 * recheck, and block instead of trying to malloc() again.
+	 */
+	buf = malloc(mpipe->bytes, mpipe->type, M_NOWAIT | mpipe->mflags);
+	if (buf) {
+	    ++mpipe->total_count;
+	    break;
+	}
+	mfailed = 1;
     }
-    buf = TAILQ_FIRST(&mpipe->queue);
-    KKASSERT(buf != NULL);
-    TAILQ_REMOVE(&mpipe->queue, buf, entry);
-    --mpipe->free_count;
     crit_exit();
-    if (flags & M_ZERO)
-	bzero(buf, mpipe->bytes);
     return(buf);
 }
 
 /*
- * Free an entry, unblock any waiters.
+ * Free an entry, unblock any waiters.  Allow NULL.
  */
 void
-mpipe_free(malloc_pipe_t mpipe, void *vbuf)
+mpipe_free(malloc_pipe_t mpipe, void *buf)
 {
-    struct mpipe_buf *buf;
+    int n;
+
+    if (buf == NULL)
+	return;
 
-    if ((buf = vbuf) != NULL) {
-	crit_enter();
-	if (mpipe->total_count > mpipe->max_count) {
-	    --mpipe->total_count;
-	    crit_exit();
-	    free(buf, mpipe->type);
-	} else {
-	    TAILQ_INSERT_TAIL(&mpipe->queue, buf, entry);
-	    ++mpipe->free_count;
-	    crit_exit();
-	    if (mpipe->free_count >= (mpipe->total_count >> 2) + 1) {
-		if (mpipe->trigger) {
-		    mpipe->trigger(mpipe->trigger_data);
-		}
-		if (mpipe->pending) {
-		    mpipe->pending = 0;
-		    wakeup(mpipe);
-		}
-	    }
+    crit_enter();
+    if ((n = mpipe->free_count) < mpipe->ary_count) {
+	/*
+	 * Free slot available in free array (LIFO)
+	 */
+	mpipe->array[n] = buf;
+	++mpipe->free_count;
+	if ((mpipe->mpflags & (MPF_CACHEDATA|MPF_NOZERO)) == 0) 
+	    bzero(buf, mpipe->bytes);
+	crit_exit();
+
+	/*
+	 * Wakeup anyone blocked in mpipe_alloc_*().
+	 */
+	if (mpipe->pending) {
+	    mpipe->pending = 0;
+	    wakeup(mpipe);
 	}
+    } else {
+	/*
+	 * All the free slots are full, free the buffer directly.
+	 */
+	--mpipe->total_count;
+	KKASSERT(mpipe->total_count >= mpipe->free_count);
+	if (mpipe->deconstruct)
+	    mpipe->deconstruct(mpipe, buf);
+	crit_exit();
+	free(buf, mpipe->type);
     }
 }
 
Index: dev/disk/ata/ata-all.c
===================================================================
RCS file: /cvs/src/sys/dev/disk/ata/ata-all.c,v
retrieving revision 1.14
diff -u -r1.14 ata-all.c
--- dev/disk/ata/ata-all.c	15 Mar 2004 01:10:42 -0000	1.14
+++ dev/disk/ata/ata-all.c	19 Mar 2004 19:49:39 -0000
@@ -167,8 +167,10 @@
     TAILQ_INIT(&ch->ata_queue);
     TAILQ_INIT(&ch->atapi_queue);
 
-    mpipe_init(&ch->req_mpipe, M_ATA, sizeof(union ata_request), 4, ata_mpipe_size);
-    mpipe_init(&ch->dma_mpipe, M_DEVBUF, PAGE_SIZE, 4, ata_mpipe_size);
+    mpipe_init(&ch->req_mpipe, M_ATA, sizeof(union ata_request), 
+		4, ata_mpipe_size, 0, NULL);
+    mpipe_init(&ch->dma_mpipe, M_DEVBUF, PAGE_SIZE, 
+		4, ata_mpipe_size, MPF_NOZERO, NULL);
 
     return 0;
     
Index: dev/disk/ata/ata-disk.c
===================================================================
RCS file: /cvs/src/sys/dev/disk/ata/ata-disk.c,v
retrieving revision 1.14
diff -u -r1.14 ata-disk.c
--- dev/disk/ata/ata-disk.c	15 Mar 2004 01:10:42 -0000	1.14
+++ dev/disk/ata/ata-disk.c	19 Mar 2004 19:45:15 -0000
@@ -409,7 +409,7 @@
      * is full, in which case the request will be picked up later when
      * ad_start() is called after another request completes.
      */
-    request = mpipe_alloc(&atadev->channel->req_mpipe, M_NOWAIT|M_ZERO);
+    request = mpipe_alloc_nowait(&atadev->channel->req_mpipe);
     if (request == NULL) {
 	ata_prtdev(atadev, "pipeline full allocating request in ad_start\n");
 	return;
Index: dev/disk/ata/ata-dma.c
===================================================================
RCS file: /cvs/src/sys/dev/disk/ata/ata-dma.c,v
retrieving revision 1.20
diff -u -r1.20 ata-dma.c
--- dev/disk/ata/ata-dma.c	9 Mar 2004 21:39:59 -0000	1.20
+++ dev/disk/ata/ata-dma.c	19 Mar 2004 19:51:17 -0000
@@ -73,11 +73,15 @@
 	return(0);
 
     KKASSERT(ch->dma_mpipe.max_count != 0);
-    atadev->dmastate.dmatab = mpipe_alloc(&ch->dma_mpipe, flags);
-    KKASSERT(((uintptr_t)atadev->dmastate.dmatab & PAGE_MASK) == 0);
+    if (flags & M_RNOWAIT)
+	atadev->dmastate.dmatab = mpipe_alloc_nowait(&ch->dma_mpipe);
+    else
+	atadev->dmastate.dmatab = mpipe_alloc_waitok(&ch->dma_mpipe);
 
-    if (atadev->dmastate.dmatab != NULL)
+    if (atadev->dmastate.dmatab != NULL) {
+	KKASSERT(((uintptr_t)atadev->dmastate.dmatab & PAGE_MASK) == 0);
 	return(0);
+    }
     return(ENOBUFS);
 }
 





More information about the Kernel mailing list