$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r63910 - in sandbox/transaction: boost/transact/detail libs/transact/perf libs/transact/test
From: bob.s.walters_at_[hidden]
Date: 2010-07-11 22:38:15
Author: bobwalters
Date: 2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
New Revision: 63910
URL: http://svn.boost.org/trac/boost/changeset/63910
Log:
Windows direct and OS-buffered ofile with directory synchronization.
Refined filewriteperf test
Text files modified: 
   sandbox/transaction/boost/transact/detail/buffering_file.hpp |    73 ++++++++++++++++++++++++---------       
   sandbox/transaction/boost/transact/detail/file.hpp           |    84 +++++++++++++++++++++++++++++++-------- 
   sandbox/transaction/boost/transact/detail/syncing_file.hpp   |    85 ++++++++++++++++++--------------------- 
   sandbox/transaction/libs/transact/perf/filewrite.cpp         |    56 +++++++++++++++++--------               
   sandbox/transaction/libs/transact/test/Jamfile.v2            |     2                                         
   5 files changed, 196 insertions(+), 104 deletions(-)
Modified: sandbox/transaction/boost/transact/detail/buffering_file.hpp
==============================================================================
--- sandbox/transaction/boost/transact/detail/buffering_file.hpp	(original)
+++ sandbox/transaction/boost/transact/detail/buffering_file.hpp	2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -17,25 +17,51 @@
 namespace transact{
 namespace detail{
 
+template<std::size_t Capacity, bool direct_io> 
+struct ofile_buffer {
+	static const bool direct = direct_io;
+	char data[Capacity];
+};
+
+#ifdef _WIN32
+// Direct I/O supported on Windows given that there are only two ways
+// to achieve synchronized sequaltial disk I/O - flush the system I/O buffers
+// or use Windows direct I/O.  Not supported on any other OS.  Although O_DIRECT
+// supported on many, strong
+template<std::size_t Capacity>
+struct ofile_buffer<Capacity,true> {
+	static const bool direct = true;
+	char *data;
+
+	ofile_buffer() {
+		// direct I/O requires pagesize alignment.  This assumes
+		// Capacity is > pagesize, but not necessarilly a multiple of it.
+		int alignment=1; // largest power of 2 >= Capacity
+		for (std::size_t i=Capacity; (i>>=1); alignment<<=1 ) 
+			;
+		data = (char*)_aligned_malloc(Capacity, alignment);
+	};
+	~ofile_buffer() {
+		_aligned_free(data);
+	};
+};
+#endif
+
+
 template<class Base,std::size_t Capacity>
 class buffering_seq_ofile{
 public:
     typedef typename Base::size_type size_type;
+	static const bool direct_io = Base::has_direct_io;
+	
     explicit buffering_seq_ofile(std::string const &name)
         : base(name)
-        , size(0){
-#ifdef _WIN32
-		// support possibility that Base is using unbuffered I/O
-		// requiring specific buffer memory alignment
-		int alignment=1; // largest power of 2 >= Capacity
-		for (std::size_t i=Capacity; (i>>=1); alignment<<=1 )
-		buffer = (char*)_aligned_malloc(Capacity, alignment);
-#endif
-	}
+        , size(0)
+		{ }
     template<class Size>
     void write(void const *data,Size s){
         if(this->size + s <= Capacity){
-            std::memcpy(this->buffer+this->size,data,s);
+            std::memcpy(this->buffer.data+this->size,data,s);
             this->size+=s;
         }else this->write_overflow(data,s);
     }
@@ -58,18 +84,27 @@
             std::cerr << "ignored exception" << std::endl;
 #endif
         }
-#ifdef _WIN32
-		_aligned_free(buffer);
-#endif
     }
 private:
     void write_overflow(void const *data,std::size_t s){
         BOOST_ASSERT(this->size + s > Capacity);
-        if(this->size == 0){
+		if (direct_io) {
+			while (this->size + s > Capacity) {
+				std::size_t write=Capacity - this->size;
+				std::memcpy(this->buffer.data+this->size,data,write);
+				this->size=Capacity;
+				this->flush_buffer();
+				data = static_cast<char const *>(data)+write;
+				s-=write;
+			}
+			if (s) {
+				this->write(data,s);
+			}
+		}else if(this->size == 0){
             this->base.write(data,s);
         }else{
             std::size_t write=Capacity - this->size;
-            std::memcpy(this->buffer+this->size,data,write);
+            std::memcpy(this->buffer.data+this->size,data,write);
             this->size=Capacity;
             this->flush_buffer();
             this->write(static_cast<char const *>(data)+write,s-write);
@@ -77,17 +112,13 @@
     }
     void flush_buffer(){
         if(this->size > 0){
-            this->base.write(this->buffer,this->size);
+            this->base.write(this->buffer.data,this->size);
             this->size=0;
         }
     }
 
     Base base;
-#ifdef _WIN32
-	char *buffer;
-#else
-	char buffer[Capacity];
-#endif
+	ofile_buffer<Capacity,direct_io> buffer;
         std::size_t size;
 };
 
Modified: sandbox/transaction/boost/transact/detail/file.hpp
==============================================================================
--- sandbox/transaction/boost/transact/detail/file.hpp	(original)
+++ sandbox/transaction/boost/transact/detail/file.hpp	2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -19,65 +19,110 @@
 #ifdef _WIN32
 #include <Windows.h>
 #include <WinBase.h>
+
+#include <strsafe.h>
+
+static void throw_io_failure(char const* function) 
+{ 
+	// Retrieve the system error message for the last-error code
+	DWORD dw = GetLastError(); 
+
+	LPVOID lpMsgBuf;
+	FormatMessage(	FORMAT_MESSAGE_ALLOCATE_BUFFER | 
+					FORMAT_MESSAGE_FROM_SYSTEM |
+					FORMAT_MESSAGE_IGNORE_INSERTS,
+					NULL,
+					dw,
+					MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+					(LPTSTR) &lpMsgBuf,
+					0, NULL );
+	
+	// Display the error message and exit the process
+	// TODO - should be incorporated into io_failure what().
+	std::cerr << function << " failed with error " << dw << ": " << (char*)lpMsgBuf << std::endl;
+
+	LocalFree(lpMsgBuf);
+	throw io_failure();
+}	
         
 // low-level ofile representation for WIN32
+template <bool direct_io = false>
 class ofile {
 public:
         typedef unsigned int size_type;
-
+	static const bool has_direct_io = direct_io;
+	
         void* filedes;
         
         ofile(std::string const &name) : filedes(INVALID_HANDLE_VALUE) {
                 unsigned long access = GENERIC_READ | GENERIC_WRITE;
                 unsigned long creation_flags = OPEN_ALWAYS;
-		unsigned long flags = FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
+		unsigned long flags = 0;
+		if ( direct_io )
+			flags |= FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
+
                 this->filedes = CreateFileA(name.c_str(), access,
+									FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
+									0, creation_flags, flags, 0);
+		if (this->filedes == INVALID_HANDLE_VALUE )
+			throw_io_failure("CreateFileA");
+		
+		//make sure the directory entry has reached the disk:
+		std::string dirname=filesystem::system_complete(name).parent_path().external_directory_string();
+
+		creation_flags = OPEN_EXISTING;
+		flags = FILE_FLAG_BACKUP_SEMANTICS;
+		void *dirfiledes = CreateFileA(dirname.c_str(), access,
                                                                                 FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
                                                                                 0, creation_flags, flags, 0);
-		if (this->filedes == INVALID_HANDLE_VALUE ) {
-			throw io_failure();
-		}
-		std::cerr << "File opened" << std::endl;
+		if (dirfiledes == INVALID_HANDLE_VALUE )
+			throw_io_failure("CreateFileA");
+		if(!FlushFileBuffers(dirfiledes)) 
+			throw_io_failure("FlushFileBuffers");
+		if(!CloseHandle(dirfiledes)) 
+			throw_io_failure("CloseHandle");
         }
         
         ~ofile() {
-		if(this->filedes != INVALID_HANDLE_VALUE) CloseHandle(this->filedes);
+		if(this->filedes != INVALID_HANDLE_VALUE) 
+			CloseHandle(this->filedes);
         }
         
         void seek(size_type const &s) {
                 LARGE_INTEGER loc;
                 loc.QuadPart = s;
-		if(SetFilePointerEx(this->filedes, loc, NULL, FILE_BEGIN) == 0) {
-			std::cerr << "SetFilePointerEx == 0" << std::endl;
-			throw io_failure();
-		}
+		if(SetFilePointerEx(this->filedes, loc, NULL, FILE_BEGIN) == 0)
+			throw_io_failure("SetFilePointerEx");
         }
         
         size_type write(const char *data, size_type const &size) {
                 DWORD written;
-		if(WriteFile(this->filedes, data, size, &written, 0) == 0) {
-			std::cerr << "WriteFile == 0" << std::endl;
-			throw io_failure();
-		}
+		if(WriteFile(this->filedes, data, size, &written, 0) == 0)
+			throw_io_failure("WriteFile");
                 return (size_type)written;
         }
         
-	void sync() { }
+	void sync() { 
+		if (!direct_io && FlushFileBuffers(this->filedes) == 0)
+			throw_io_failure("FlushFileBuffers");
+	}
 };
 
 #else
         
 #include <unistd.h>
 #include <fcntl.h>
-	
+
 #ifndef _POSIX_SYNCHRONIZED_IO
 #error no POSIX synchronized IO available
 #endif
 
 // low-level ofile for Linux/Unix
+template <bool direct_io = false> // ignored on Posix API.
 class ofile {
 public:
         typedef unsigned int size_type;
+	static const bool has_direct_io = direct_io;
 
         int filedes;
         
@@ -88,6 +133,7 @@
 #endif
                 this->filedes= open(name.c_str(),flags,S_IRUSR | S_IWUSR);
                 if(this->filedes==-1) throw io_failure();
+		
                 { //make sure the directory entry has reached the disk:
                         std::string dirname=filesystem::path(name).directory_string();
                         if(dirname.empty()) dirname=".";
@@ -121,7 +167,9 @@
 };
 
 #endif
-		
+
+typedef ofile<true> direct_ofile;
+	
 }
 }
 }
Modified: sandbox/transaction/boost/transact/detail/syncing_file.hpp
==============================================================================
--- sandbox/transaction/boost/transact/detail/syncing_file.hpp	(original)
+++ sandbox/transaction/boost/transact/detail/syncing_file.hpp	2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -15,78 +15,73 @@
 #include <boost/static_assert.hpp>
 #include <boost/assert.hpp>
 #include <boost/config.hpp>
-#include <boost/transact/detail/file.hpp>
+#include <boost/transact/detail/buffering_file.hpp>
 
 namespace boost{
 namespace transact{
 namespace detail{
 
+
+template <class Base>
 class syncing_seq_ofile{
 public:
     typedef unsigned int size_type;
-    explicit syncing_seq_ofile(std::string const &name);
-    void write(void const *data,std::size_t size);
+	static const bool has_direct_io = Base::has_direct_io;
+	
+    explicit syncing_seq_ofile(std::string const &name)
+		: pos(0)
+		, base(name){
+			this->write_ahead(0,write_ahead_size);
+		}
+    void write(void const *data,std::size_t size){
+		size_type const s=this->pos % write_ahead_size;
+		if(s + size >= write_ahead_size){ //there must be at least one 0 at the and, so also write ahead if this is equal.
+			size_type start=this->pos - s + write_ahead_size;
+			size_type end=start+((s + size)/write_ahead_size) * write_ahead_size; //usually == start + write_ahead_size, but "size" can theoretically span a whole write_ahead_size
+			BOOST_ASSERT(end > start);
+			this->write_ahead(start,end);
+		}
+		
+		std::size_t ret= base.write((char const *)data,size);
+		if(ret > 0) this->pos+=ret;
+		if(ret != std::size_t(size)) throw io_failure();
+	}
     size_type position() const{ return this->pos; }
-    void flush();
-    void sync();
+    void flush() {}
+    void sync() {
+		base.sync();
+	}
 private:
     size_type pos;
-	ofile  filedes;
+	Base base;
 
-private:
     void write_ahead(size_type const &start,size_type const &end){
         BOOST_ASSERT(start % write_ahead_size == 0);
         BOOST_ASSERT(end % write_ahead_size == 0);
         BOOST_STATIC_ASSERT(write_ahead_size % page_size == 0);
-        filedes.seek(start);
+        base.seek(start);
         for(size_type off=start;off < end;off+=page_size){
-            filedes.write(empty_page.data,page_size);
+            base.write(empty_page.data,page_size);
         }
-		filedes.sync();
-		filedes.seek(this->pos);
+		base.sync();
+		base.seek(this->pos);
     }
 
     static std::size_t const write_ahead_size=10*1024*1024;
     static std::size_t const page_size=4096;
 
-    struct empty_page_type{
-        empty_page_size(){
-            std::memset(data,0,page_size);
+    struct empty_page_type : public ofile_buffer<page_size,has_direct_io> {
+		typedef ofile_buffer<page_size,has_direct_io> base_buffer;
+        empty_page_type() : base_buffer() {
+            std::memset(base_buffer::data,0,page_size);
         }
-        char data[page_size];
-    }
+    };
+	
     static empty_page_type empty_page;
-    int filedes;
 };
 
-syncing_seq_ofile::empty_page_type syncing_seq_ofile::empty_page;
-
-inline syncing_seq_ofile::syncing_seq_ofile(std::string const &name)
-    : pos(0)
-    , filedes(name){
-    this->write_ahead(0,write_ahead_size);
-}
-
-void syncing_seq_ofile::write(void const *data,std::size_t size){
-    size_type const s=this->pos % write_ahead_size;
-    if(s + size >= write_ahead_size){ //there must be at least one 0 at the and, so also write ahead if this is equal.
-        size_type start=this->pos - s + write_ahead_size;
-        size_type end=start+((s + size)/write_ahead_size) * write_ahead_size; //usually == start + write_ahead_size, but "size" can theoretically span a whole write_ahead_size
-        BOOST_ASSERT(end > start);
-        this->write_ahead(start,end);
-    }
-
-	std::size_t ret= filedes.write((char const *)data,size);
-    if(ret > 0) this->pos+=ret;
-    if(ret != std::size_t(size)) throw io_failure();
-}
-
-
-inline void syncing_seq_ofile::flush(){}
-	
-inline void syncing_seq_ofile::sync(){
-	filedes.sync();
-}
+template<class Base>
+typename syncing_seq_ofile<Base>::empty_page_type syncing_seq_ofile<Base>::empty_page;
 
 
 }
Modified: sandbox/transaction/libs/transact/perf/filewrite.cpp
==============================================================================
--- sandbox/transaction/libs/transact/perf/filewrite.cpp	(original)
+++ sandbox/transaction/libs/transact/perf/filewrite.cpp	2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -14,21 +14,20 @@
 #include <boost/transact/detail/buffering_file.hpp>
 #include <boost/transact/detail/sectorizing_file.hpp>
 #include <boost/transact/detail/syncing_file.hpp>
+#include <boost/transact/detail/file.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
 
-static const int num_sets=100;
-static const int txns_per_set=1000;
-static const int txn_size=3000;  // typical transaction log size
-static int total_txns = 0;
-static int total_bytes = 0;
+static const int loop_size=1000;
 
-static char txn_buffer[ txn_size ]; // random data set
+static char txn_buffer[ 10000 ]; // random data set
 
 using namespace boost::transact;
+using namespace boost::posix_time;
 
 typedef detail::sectorizing_seq_ofile<
                         detail::aligning_seq_ofile<
                                 detail::buffering_seq_ofile<
-					detail::syncing_seq_ofile,
+					detail::syncing_seq_ofile< detail::ofile<false> >,
                                         8192
 				>
 			>
@@ -36,23 +35,42 @@
 ofile_t;
 
 
-void log_a_set(ofile_t &outfile) {
-	for (int i=0; i<txns_per_set; i++) {
+typedef detail::sectorizing_seq_ofile<
+			detail::aligning_seq_ofile<
+				detail::buffering_seq_ofile<
+					detail::syncing_seq_ofile< detail::direct_ofile >,
+					8192
+				>
+			>
+		>
+direct_ofile_t;
+
+template <class file_t>
+void filetest1(const char *filename, size_t txn_size) {
+	file_t outfile(filename);	
+	
+	ptime start = microsec_clock::local_time();	
+	for (int i=0; i<loop_size; i++) {
                 outfile.write(txn_buffer, txn_size);
+		outfile.sync();
         }
-	total_txns += txns_per_set;
-	total_bytes += (txns_per_set * txn_size); 
-	std::cout << "Written " << total_txns << " txns, " << total_bytes << " bytes" << std::endl;
-}
+	ptime end = microsec_clock::local_time();
 
-void filetest1() {
-	ofile_t outfile("filetest1.out");
-	for (int i=0; i<num_sets; i++) {
-		log_a_set(outfile);
-	}
+	std::cout << "Written " << loop_size << " txns, " 
+			<< loop_size*txn_size << " bytes, in "
+			<< (end-start) << " microseconds"
+			<< std::endl;
 }
 
+
 int main(int, const char *[]){
-    filetest1();
+	// write loop_size transactions to disk, each 3k in size.
+    filetest1<ofile_t>("filetest1.out", 3000);
+    filetest1<direct_ofile_t>("filetest2.out", 3000);
+
+	// write loop_size transactions to disk, each 10k in size.
+	filetest1<ofile_t>("filetest3.out", 10000);
+    filetest1<direct_ofile_t>("filetest4.out", 10000);
+
     return 0;
 }
Modified: sandbox/transaction/libs/transact/test/Jamfile.v2
==============================================================================
--- sandbox/transaction/libs/transact/test/Jamfile.v2	(original)
+++ sandbox/transaction/libs/transact/test/Jamfile.v2	2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -51,7 +51,7 @@
 
     alias filewriteperf
     :
-        [ run ../perf/filewrite.cpp :  :  : <library>/boost//date_time <library>/boost//system <link>static ]
+        [ run ../perf/filewrite.cpp :  :  : <library>/boost//filesystem <library>/boost//date_time <library>/boost//system <link>static ]
     ;
         
     alias all