zipping DataSet.WriteXml via stream

Sep 30, 2009 at 1:32 AM

Hi,

I'm trying to zip the output of DataSet.WriteXml via a stream to a zipfile (and of course re-populate a DataSet from a zipped file using DataSet.ReadXml).

The following code creates a zip archive with one entry; however, that entry has zero length.  Any ideas on what I'm doing wrong?

 

DataSet1 ds = new DataSet1();
ds.Pricing.AddPricingRow(new DateTime(2009, 1, 1), "IBM", 100);
ds.Pricing.AddPricingRow(new DateTime(2009, 1, 2), "IBM", 101);
ds.Pricing.AddPricingRow(new DateTime(2009, 1, 3), "IBM", 102);

ds.WriteXml(@"c:\temp\test.xml"); //this works fine

Ionic.Zip.ZipFile zip = new Ionic.Zip.ZipFile();
MemoryStream ms = new MemoryStream();
Ionic.Zip.ZipEntry entry = zip.AddEntry("test.xml", "", ms);

ds.WriteXml(ms);

zip.Save(@"c:\temp\test.xml.zip");
zip.Dispose();

Any pointers greatly appreciated.  Thank you.  WLM

 

Coordinator
Sep 30, 2009 at 3:57 AM

Maybe try ms.Seek(0,SeekOrigin.Begin)  before calling zip.Save().

I think you've written to the MemoryStream, but the pointer in the MemoryStream is at the end of the stream, at the time you call zip.Save().  You want it at the beginning.

Sep 30, 2009 at 9:54 AM

Beautiful!  Worked like a charm.  Thanks.

Sep 30, 2009 at 4:40 PM

Whoops--I spoke too soon.

Setting the stream position to zero did indeed fix the problem in my simple example above; however, in the "real world", I need to persist very large datasets.  When using the MemoryStream technique above I got an OutOfMemoryException.  Looks like I need to write directly to the zip file.  Is there a way to accomplish this using streams?  I realize I could just call the standard DataSet.WriteXml method, zip the resulting file to an archive, and then delete the original file, but I would prefer to do it in one fell swoop without that workaround.  Any thoughts on how to accomplish this are appreciated.

Many thanks,
WLM

Coordinator
Sep 30, 2009 at 5:31 PM
Edited Oct 1, 2009 at 5:03 AM

When ZipFile.Save() is called, all entries are written out to the output.  For any entries added as streams with ZipFile.AddEntry(), zipFile.Save calls Read() on those streams.  In other words, ZipFile requires a stream that can be read.    On the other hand, DataSet can put its data only on a stream that accepts Write().  DataSet requires a stream that can be written.

What you need is two threads and an adapter stream - Something like the BlockingStream Stephen Taub provided in MSDN Magazine .  Except I think his class implementation allocates an unlimited number of buffers, which may get you into the out-of-memory condition again.  What you want is a BlockingStream with a finite set of buffers. 

I previously had a need for something like that so I wrote one. Enclosed below. To use it:

 

// thread #1
AdapterStream as = new AdapterStream(8192);  // 8k buffer
dataset.WriteXml(as); 
as.BlockSync(); // returns when ZipFile.Save() completes

// thread #2
using (var zip = new ZipFile())
{
  zip.AddEntry("mydata.xml", "", as); 
  zip.Save(whatever);
}

 

AdapterStream looks like this:

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace Cheeso.IO
{
    /// <summary>
    /// A Stream for passing data between a producer thread and a consumer thread.
    /// </summary>
    ///
    /// <remarks>
    ///   Probably want this to use at least 2 blocks, each with a
    ///   separate lock, to allow more concurrency.
    /// </remarks>
    internal class AdapterStream : Stream
    {
        /// <summary>Used as a monitor to protect state accessible to callers of Read.</summary>
        private object _lockForAll;
        
        private byte[] _circularBuffer;
        private int _posnWrite;
        private int _posnRead;
        private int _bytesAvailable;

        
        /// <summary>
        ///   Event used to signal when no more data will be written to the stream.
        /// </summary>
        private ManualResetEvent _blockSync;
        
        /// <summary>
        ///   Event used to signal when the circular buffer has space
        ///   available for writing.
        /// </summary>
        private ManualResetEvent _spaceAvailable;
        
        /// <summary>
        ///   Event used to signal that data is available for reading.
        /// </summary>
        private ManualResetEvent _dataAvailable;
        
        /// <summary>Array of events waited on by readers.</summary>
        private WaitHandle[] _readEvents;
        
        /// <summary>The index into _waitHandles for the _blockSync event.</summary>
        private int _blockSyncIndex;


        /// <summary>Initializes the blocking stream.</summary>
        public AdapterStream(int size)
        {
            _circularBuffer= new byte[size];
            _blockSync = new ManualResetEvent(false);
            _dataAvailable = new ManualResetEvent(false);
            _spaceAvailable = new ManualResetEvent(true);
            _readEvents = new WaitHandle[] { _dataAvailable, _blockSync };
            _blockSyncIndex= 1;
            //_lockForRead = new object();
            _lockForAll = new object();
        }

        
        /// <summary>Determines whether data can be read from the stream.</summary>
        public override bool CanRead { get { return true; } }
        /// <summary>Determines whether the stream can be seeked.</summary>
        public override bool CanSeek { get { return false; } }
        /// <summary>Determines whether data can be written to the stream.</summary>
        public override bool CanWrite { get { return true; } }

        /// <summary>Flush is a no-op.</summary>
        public override void Flush() { }
        
        /// <summary>Not supported.</summary>
        public override long Length { get { throw new NotSupportedException(); } }
        
        /// <summary>Not supported.</summary>
        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        /// <summary>Not supported.</summary>
        public override long Seek(long offset, SeekOrigin origin)
        { throw new NotSupportedException(); }

        
        /// <summary>Not supported.</summary>
        public override void SetLength(long value) { throw new NotSupportedException(); }

        
        /// <summary>
        /// Reads a sequence of bytes from the stream.  Read will block
        /// until at least one byte is available for reading or until
        /// the stream has been ended for writing.
        /// </summary>
        /// <param name="buffer">
        /// An array of bytes. When Read returns, the buffer contains the specified
        /// byte array with the values between offset and (offset + count - 1) replaced
        /// by the bytes read from the current source.
        /// </param>
        /// <param name="offset">
        /// The zero-based byte offset in buffer at which to begin storing the data read
        /// from the current stream.
        /// </param>
        /// <param name="count">The maximum number of bytes to be read from the current stream.</param>
        /// <returns>
        /// The total number of bytes read into the buffer. This can be less than the
        /// number of bytes requested if that many bytes are not currently available,
        /// or zero if the end of the stream has been reached.
        /// </returns>
        /// <remarks>This method is thread-safe.</remarks>
        public override int Read(byte[] buffer, int offset, int count)
        {
            // Validate all arguments and instance state
            if (buffer == null) throw new ArgumentNullException("buffer");
            if (offset < 0 || offset >= buffer.Length) throw new ArgumentOutOfRangeException("offset");
            if (count < 0 || offset + count > buffer.Length) throw new ArgumentOutOfRangeException("count");
            if (_dataAvailable == null) throw new ObjectDisposedException(GetType().Name);

            // If no data has been requested, we don't need to do anything
            if (count == 0) return 0;
            
            // Loop until we get data or until writing has completed
            while (true)
            {
                // Wait for either data to be available or for the sequence to be
                // completed.
                int handleIndex = WaitHandle.WaitAny(_readEvents);
                
                lock (_lockForAll)
                {
                    // If nothing is available, loop around to wait again.
                    // This is the blocking part. 
                    if (_bytesAvailable==0)
                    {
                        if (handleIndex == _blockSyncIndex)
                        {
                            // All done with this item in the sequence,
                            // nothing more to read.

                            // Signal that the writer can begin writing again (new sequence).
                            _spaceAvailable.Set();

                            // Turn off the sequence break. It's been handled. 
                            _blockSync.Reset();
                            return 0; // EOF
                        }

                        // In this case, there's no data available to Read, but the
                        // writer hasn't completed the writes on this sequence. So
                        // loop around, block, and try to read again. This should
                        // never happen, because _bytesAvailable should be nonzero
                        // if _dataAvailable is signalled.  No harm though.
                        continue;
                    }
                    
                    // Read as many bytes as possible
                    int bytesToRead= (_bytesAvailable > count) ? count : _bytesAvailable;
                    
                    // Will we need to wrap around in the circ buffer to satisfy the read?
                    if (_posnRead + bytesToRead > _circularBuffer.Length)
                    {
                        // yes - need to wrap around 
                        int thisBatch = _circularBuffer.Length - _posnRead;
                        Buffer.BlockCopy(_circularBuffer, _posnRead, buffer, offset, thisBatch);
                        _posnRead = 0;
                        offset += thisBatch; // matters only if we loop until the read is done.
                        thisBatch = bytesToRead - thisBatch;
                        Buffer.BlockCopy(_circularBuffer, _posnRead, buffer, offset, thisBatch);
                        _posnRead += thisBatch;
                    }
                    else
                    {
                        // Satisfy the read without wrap.
                        Buffer.BlockCopy(_circularBuffer, _posnRead, buffer, offset, bytesToRead);
                        _posnRead += bytesToRead;
                    }
                    
                    _bytesAvailable -= bytesToRead;
                    if (_bytesAvailable == 0) _dataAvailable.Reset();
                    
                    _spaceAvailable.Set();
                    
                    return bytesToRead;
                }
            }
        }
        

        /// <summary>Writes a sequence of bytes to the stream.</summary>
        /// <param name="buffer">An array of bytes. Write copies count bytes from buffer to the stream.</param>
        /// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes to the stream.</param>
        /// <param name="count">The number of bytes to be written to the current stream.</param>
        /// <remarks>This method is thread-safe, and will block until the write can be completed.</remarks>
        public override void Write(byte[] buffer, int offset, int count)
        {
            // Validate all arguments and instance state
            if (buffer == null) throw new ArgumentNullException("buffer");
            if (offset < 0 || offset >= buffer.Length) throw new ArgumentOutOfRangeException("offset");
            if (count < 0 || offset + count > buffer.Length) throw new ArgumentOutOfRangeException("count");
            if (_dataAvailable == null) throw new ObjectDisposedException(GetType().Name);

            // If no data is being written, nothing to do.
            if (count == 0) return;

            while (count > 0)
            {
                // Wait for space to become available.
                _spaceAvailable.WaitOne();
                
                // Copy the written data into the circularBuffer.
                // Make sure to signal to any readers that data is available.
                lock (_lockForAll)
                {
                    int bufferSpaceAvailable = (_posnRead <= _posnWrite)
                        ? _circularBuffer.Length - _posnWrite
                        : _posnRead - _posnWrite - 1;

                    int bytesToWrite = (count < bufferSpaceAvailable) ? count : bufferSpaceAvailable;
                    
                    Buffer.BlockCopy(buffer, offset, _circularBuffer, _posnWrite, bytesToWrite);
                    count -= bytesToWrite;
                    offset += bytesToWrite; 
                    _bytesAvailable += bytesToWrite;
                    _posnWrite += bytesToWrite;
                    if (_posnWrite >= _circularBuffer.Length)
                        _posnWrite = 0;
                    _dataAvailable.Set();

                    if (_posnWrite==_posnRead)
                        _spaceAvailable.Reset();
                }
            }
        }

        /// <summary>
        /// Called by the writer to signal to the reader that a block of
        /// data is complete.
        /// </summary>
        ///
        /// <remarks>
        /// This should be called by the writer. This method is thread-safe. Subsequent
        /// calls to Write will block until the reader completes all reads of the
        /// just-completed block.
        /// </remarks>
        public void BlockSync()
        {
            if (_dataAvailable == null) throw new ObjectDisposedException(GetType().Name);
            lock (_lockForAll)
            {
                // This will signal the reader that there's a break.
                _blockSync.Set();

                // This will block the writer til the reader reaches EOF.
                // We don't want the writing filling the buffer again, until
                // the reader has finished reading this block.
                _spaceAvailable.Reset();
            }
        }


        
        private void FreeEvent(ref ManualResetEvent e)
        {
            if (e != null)
            {
                e.Close();
                e = null;
            }
        }


        
        /// <summary>Closes the stream and releases all resources associated with it.</summary>
        /// <remarks>This method is not thread-safe.</remarks>
        public override void Close()
        {
            base.Close();
            
            // Free the events
            FreeEvent(ref _dataAvailable);
            FreeEvent(ref _spaceAvailable);
            FreeEvent(ref _blockSync);
        }
    }
}

 

 

Oct 1, 2009 at 12:14 AM

Hi Cheeso,

Thanks for the reply and code.  I understand what you are saying from a high level, but the ability to create something like your AdapterStream is beyond my meager skills.  Unfortunately the AdapterStream class as written does not compile.  In the read method, the bytesToCopy variable in "return bytesToCopy" is not recognized. Perhaps you meant _bytesAvailable?

I also think some of the methods that are stubbed out with NotSupportedExceptions are being called behind the scenes by the ZipFile class.

I attempted to get this running on different threads but I think there are some other things going on in addition to the fact that I probably didn't implement the treading properly.

Best regards,
WLM

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Ionic.Zip;
using System.Threading;

namespace CompressionTest
{
	class DataStore
	{
		private DataSet1 m_ds;
		private AdapterStream m_as;

		public void Write()
		{
			m_ds = new DataSet1();
			m_ds.Pricing.AddPricingRow(new DateTime(2009, 1, 1), "IBM", 100);
			m_ds.Pricing.AddPricingRow(new DateTime(2009, 1, 2), "IBM", 101);
			m_ds.Pricing.AddPricingRow(new DateTime(2009, 1, 3), "IBM", 102);

			m_ds.WriteXml(@"c:\temp\test.xml"); //this works fine

			Thread t = new Thread(new ThreadStart(Thread1));
			t.Start();

			t = new Thread(new ThreadStart(Thread2));
			t.Start();

		}

		private void Thread1()
		{
			m_as = new AdapterStream(8192);  // 8k buffer
			m_ds.WriteXml(m_as);
			m_as.BlockSync(); // returns when ZipFile.Save() completes
		}

		private void Thread2()
		{
			using (var zip = new ZipFile())
			{
				zip.AddEntry("test.xml", "", m_as);
				zip.Save(@"c:\temp\test.xml.zip");
			}
		}
	}
}
 

 


Coordinator
Oct 1, 2009 at 5:15 AM
Edited Oct 1, 2009 at 5:16 AM

Hey Wlm, ah, that code I posted was not quite right. I've fixed it and updated it in the post above.  It compiles now :0.   Anyway, this is the code I used to verify that the idea works:

private Cheeso.IO.AdapterStream as1;
private DataSet ds1;

private void SaveDataset(object ignored)
{
    System.Console.WriteLine("writing dataset to as1...");
    ds1.WriteXml(as1); 
    as1.BlockSync(); 
}

public void Run()
{
    System.Data.SqlClient.SqlConnection c1= new System.Data.SqlClient.SqlConnection(connstring1);

    System.Data.SqlClient.SqlDataAdapter da = new System.Data.SqlClient.SqlDataAdapter();
    da.SelectCommand=  new System.Data.SqlClient.SqlCommand(strSelect);
    da.SelectCommand.Connection= c1;

    System.Console.WriteLine("Create dataset...");

    ds1 = new DataSet();
    System.Console.WriteLine("Fill dataset...");
    da.Fill(ds1, "Invoices");

    System.Console.WriteLine("Save dataset to xml file...");
    ds1.WriteXml(xmlOutputFile); //this works fine

    as1 = new Cheeso.IO.AdapterStream(8192);

    ThreadPool.QueueUserWorkItem(new WaitCallback(SaveDataset), null);

    using(Ionic.Zip.ZipFile zip = new Ionic.Zip.ZipFile())
    {
        Ionic.Zip.ZipEntry entry = zip.AddEntry(zipEntryName, "", as1);
        zip.Save(zipFileName);
    }

    // compare datasets
    DataSet ds2 = new DataSet();
    ds2.ReadXml(xmlOutputFile);

    DataSet ds3 = new DataSet();
    using(Ionic.Zip.ZipFile zip = Ionic.Zip.ZipFile.Read(zipFileName))
    {
        // load a DataSet directly from a Stream created for the entry in the zip
        var s = zip[zipEntryName].OpenReader();
        ds3.ReadXml(s);
    }

    CompareDatasets(ds2, ds3);
}

The DataSet is written to the AdapterStream within SaveDataset(). Each call to AdapterStream.Write() buffers the data.  At the same time, the zip.Save() calls Read() on the Stream and gets the data out of the buffer.  

Oct 2, 2009 at 9:57 AM

Cheeso,

Wow, thanks for taking the time to straightened that out.  I did get one NotSupportedException in the Length property of AdapterStream, but haven't been able to replicate it since.  That said, I set a breakpoint and the ZipFile class is definitely calling the Length method and handling the exception.  I changed it to return _circularBuffer.Length with no ill effects as far as I can tell.  Otherwise it works great.  Hopefully these postings will help someone else out down the road.  Thanks.

WLM

Coordinator
Oct 2, 2009 at 4:58 PM

> return _circularBuffer.Length

That is not the correct value to return for the stream. 

For some streams, such as non-seekable (forward-only) read-only streams, the Stream.Length property is only knowable after all bytes have actually been read.  The AdapterStream is one such stream type.  The DotNetZip library is aware that this occurs for some stream types, which is why it handles the exception thrown. 

It isn't necessary, and it is incorrect, for you to change the Length property to return _circularBuffer.Length .

It could result in a broken zip file - in other words a zipentry will not contain all the bytes from your dataset. 

good luck.

 

Oct 2, 2009 at 8:42 PM

Understood.  I'll quickly change that back to the way it was.  Some of my xml files were 100's of megs.  They compress down nicely now, usually by around 95%. 

Coordinator
Oct 2, 2009 at 9:08 PM

XML is very compressible.  Particularly DataSet XML.

Coordinator
Oct 4, 2009 at 6:57 AM

WLM,

followup.  I modified DotNetZip to add a new feature to deal with this situation more cleanly.

If you download v1.9.0.12 (available now)  you will be able to do something like this to save a dataset into a zip file, without writing a file to the filesystem.

DataSet ds1 = new DataSet();
da.Fill(ds1, "Invoices");
using(Ionic.Zip.ZipFile zip = new Ionic.Zip.ZipFile())
{
    zip.AddEntry(zipEntryName, (name,stream) => ds1.WriteXml(stream) );
    zip.Save(zipFileName);
}

Using that programming model requires .NET 3.5, for the anonymous methods.

I recommend using this new feature over the AdapterStream thing I sent you.

Oct 5, 2009 at 1:55 AM

Works perfectly.  Very nice to have it all contained within DotNetZip, hiding the complexity from neophytes like me.

Thanks Cheeso.

 

Oct 5, 2009 at 7:27 AM

just to clarify, can we close the supplied stream at the end of the write? many xxx.write(stream) methods are closing the stream at the end.

Coordinator
Oct 5, 2009 at 3:36 PM
Edited Oct 5, 2009 at 3:50 PM

GM,

what are you asking here?  Do you want the application to be responsible for closing the stream?  Or do you want DotNetZip to be responsible for closing the stream?  Or ... are you simply asking for clarification on the way it works?

The way it works now, is that DotNetZip opens and configures the stream, before providing it to the application when calling the delegate.  DotNetZip also takes responsibility for closing the stream, after the delegate returns. 

On the other hand, the doc for System.IO.Stream says that

If the stream is already closed, a call to Close throws no exceptions.

Do you have a particular implementation of xxx.Write(stream) that closes the stream, and causes a problem  ?

 

Oct 6, 2009 at 1:34 PM

I wanted to know if it is allowed to close the supplied stream, because there are a lot of existing methods in the framework that can be used, and a lot are closing the stream.

my understanding is now: there is no problem.