Sending BLOB Data to SQL SERVER Using IROWSETFASTLOAD and ISEQUENTIALSTREAM

Steve Hale

SQL Server Native Client Development Team

Microsoft Corporation

Recently there have been several requests for samples showing how to use IRowsetFastLoad to send varying BLOB data per row or how to stream data to SQL Server via SQLOLEDB or SQLNCLI. Most of the available samples are too simplistic and don’t demonstrate varying data length per row or using ISequentialStream.

In this sample, you'll see both techniques in one source file. By default, the sample shows how to use IRowsetFastLoad to send variable length BLOB data per row using in-line bindings. In this case, the in-line BLOB data must fit in available memory. This method has better performance when the BLOB data is a few bytes to a few MB, because there is no additional stream overhead. For larger data, especially where the data is not necessarily all available at once in a block, streaming is a better choice.

When you uncomment #define USE_ISEQSTREAM, the sample will use ISequentialStream. The stream implementation is defined in the sample, and can send any size BLOB data simply by changing the MAX_BLOB size. In this case, the stream data does not have to fit in available memory or be available in one block. The provider is called using IRowsetFastLoad::InsertRow, passing a pointer to the stream implementation in the data buffer (rgBinding.obValue offset) along with the amount of data available to read from the stream. Some providers may not require the length of the data to be known when binding occurs, and in that case the length may be omitted from the binding.

Note that the sample does not write data to the provider using the provider’s stream interface. Rather, the sample passes a pointer to the stream object that the provider will consume to read the data. Typically, SQLOLEDB and SQLNCLI will read data in 1024 byte chunks from our object until all the data has been processed. Neither SQLOLEDB nor SQLNCLI have full implementations for allowing the consumer to write data to the provider’s stream object. Only zero length data can be sent via the provider’s stream object.

The consumer-implemented ISequentialStream object can be used with rowset data (IRowsetChange::InsertRow, IRowsetChange::SetData) and with parameters by binding a parameter as DBTYPE_IUNKNOWN.

Since DBTYPE_IUNKNOWN is specified as the data type in the binding, it has to match the type of the column or parameter being targeted. There are no conversions possible when sending data via ISequentialStream from rowset interfaces. For parameters, you should avoid using ICommandWithParameters::SetParameterInfo and specify a different type to force a conversion. This is because it would require the provider to cache all the BLOB data locally in order to convert it prior to sending it to SQL Server. Caching a large BLOB and converting it locally does not give good performance.

Recommended reading:

http://msdn2.microsoft.com/en-us/library/ms131277.aspx (SQL Server 2007)

http://technet.microsoft.com/en-us/library/aa198331(SQL.80).aspx (SQL Server 2000)

 

Code Sample

You can compile this sample at the command line by including the following libraries: 

  • ole32.lib

  • Oleaut32.lib

You will also need to change MyServer in the connection string to the name of a server at your location.

You'll also need to run the following Transact SQL before you execute the program:

create table fltest(col1 int, col2 int, col3 image)

You can see the results of the sample by running this Transact SQL:

select top 50 * from fltest

 //
// PROGRAM: FastLoadTest
// (C) Microsoft Corporation. All rights reserved
//
// PURPOSE: Insert variable size BLOB data into different rows with IRowsetFastLoad.
// Demonstrate sending BLOB data with stream binding (ISequentialStream)
//
// REQUIREMENTS: create table fltest(col1 int, col2 int, col3 image)
// VERIFICATION: select top 50 * from fltest
//

// #define USE_ISEQSTREAM

#include <windows.h>

#define DBINITCONSTANTS // Must be defined to initialize constants in oledb.h
#define INITGUID             

#include <sqloledb.h>
#include <oledb.h>
#include <msdasc.h>
#include <stdio.h>
#include <stdlib.h>
#include <conio.h>

#define MAX_BLOB 200 // For stream binding this can be any size, but for inline it must fit in memory
#define MAX_ROWS 100
#define SAFE_RELEASE(p) if (p) {(p)->Release();(p)=NULL;}
#define DROP_TABLE L"drop table fltest"
#define CREATE_TABLE L"create table fltest(col1 int, col2 int, col3 image)"

void usage(void);

#ifdef USE_ISEQSTREAM
// ISequentialStream implementation for streaming data
class MySequentialStream : public ISequentialStream {

private:
ULONG m_ulRefCount;
ULONG m_ulBufSize;
ULONG m_ulReadSize;
ULONG m_ulBytesLeft;
ULONG m_ulReadPos;
BYTE * m_pSrcData;
BYTE * m_pReadPtr;
BOOL m_fWasRead;

public:

   MySequentialStream(void)
{
m_ulRefCount = 1;
m_ulBufSize = 0;
m_ulReadSize = 0;
m_ulBytesLeft = 0;
m_ulReadPos = 0;
m_pSrcData = NULL;
m_pReadPtr = NULL;
m_fWasRead = FALSE;
}

   ~MySequentialStream(void)
{
}

   virtual ULONG STDMETHODCALLTYPE AddRef(void)
{
return ++m_ulRefCount;
}

   virtual ULONG STDMETHODCALLTYPE Release(void)
{
--m_ulRefCount;
if (m_ulRefCount == 0)
{
delete this;
return 0;
}
return m_ulRefCount;
}

   virtual HRESULT STDMETHODCALLTYPE QueryInterface(REFIID riid, void ** ppvObj)
{
// HRESULT hr = E_FAIL;

      if (!ppvObj)
return E_INVALIDARG;
else
*ppvObj = NULL;

      if (riid != IID_ISequentialStream && riid != IID_IUnknown)
return E_NOINTERFACE;

      AddRef();

      *ppvObj = this;

      return S_OK;
}

   HRESULT Init(const void * pSrcData, const ULONG ulBufSize, const ULONG ulReadSize)
{
// Must have a source
if (NULL == pSrcData)
return E_INVALIDARG;

      // Data length must be non-zero
if (0 == ulBufSize)
return E_INVALIDARG;

      m_ulBufSize = ulBufSize;
m_ulReadSize = ulReadSize;
m_pSrcData = (BYTE *)pSrcData;
m_pReadPtr = m_pSrcData;
m_ulBytesLeft = m_ulReadSize;
m_ulReadPos = 0;
m_fWasRead = FALSE;

      return S_OK;
}

   // SQL Server providers (SQLOLEDB/SQLNCLI) don't allow us to write our data to them. Instead,
// they read from our object.
virtual HRESULT STDMETHODCALLTYPE Write(const void *, ULONG, ULONG * )
{
return E_NOTIMPL;
}

   // This implementation simply copies data from the source buffer in whatever size requested.
// But you can do anything here such as reading from a file, reading from a different rowset, stream, etc.
virtual HRESULT STDMETHODCALLTYPE Read(void * pv, ULONG cb, ULONG * pcbRead)
{
ULONG ulBytesWritten = 0;
ULONG ulCBToWrite = cb;
ULONG ulCBToCopy;
BYTE * pvb = (BYTE *)pv;

      m_fWasRead = TRUE;

      if (NULL == m_pSrcData)
return E_FAIL;

      if (NULL == pv)
return STG_E_INVALIDPOINTER;

      while (ulBytesWritten < ulCBToWrite && m_ulBytesLeft)
{
// Make sure we don't write more than our max read size or the size they asked for
ulCBToCopy = min(m_ulBytesLeft, cb);

         // Make sure we don't read past the end of the internal buffer
ulCBToCopy = min(m_ulBufSize - m_ulReadPos, ulCBToCopy);

         memcpy(pvb, m_pReadPtr + m_ulReadPos, ulCBToCopy);
pvb += ulCBToCopy;
ulBytesWritten += ulCBToCopy;
m_ulBytesLeft -= ulCBToCopy;
cb -= ulCBToCopy;

         // Wrap reads around the src buffer
m_ulReadPos += ulCBToCopy;
if (m_ulReadPos >= m_ulBufSize)
m_ulReadPos = 0;
}

      if (pcbRead)
*pcbRead = ulBytesWritten;

      return S_OK;
}
};

#endif // USE_ISEQSTREAM

HRESULT SetFastLoadProperty(IDBInitialize * pIDBInitialize)
{
HRESULT hr = S_OK;
IDBProperties * pIDBProps = NULL;
DBPROP rgProps[1];
DBPROPSET PropSet;

VariantInit(&rgProps[0].vValue);

rgProps[0].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProps[0].colid = DB_NULLID;
rgProps[0].vValue.vt = VT_BOOL;
rgProps[0].dwPropertyID = SSPROP_ENABLEFASTLOAD;

rgProps[0].vValue.boolVal = VARIANT_TRUE;

PropSet.rgProperties = rgProps;
PropSet.cProperties = 1;
PropSet.guidPropertySet = DBPROPSET_SQLSERVERDATASOURCE;

if(SUCCEEDED(hr = pIDBInitialize->QueryInterface(
IID_IDBProperties,
(LPVOID *)&pIDBProps)))
{
hr = pIDBProps->SetProperties(1, &PropSet);
}

VariantClear(&rgProps[0].vValue);

if(pIDBProps)
pIDBProps->Release();

return hr;
}

void wmain()
{
// Setup the initialization options
ULONG cProperties = 0;
DBPROP rgProperties[10];
ULONG cPropSets = 0;
DBPROPSET rgPropSets[1];
LPWSTR pwszProgID = L"SQLOLEDB";
LPWSTR pwszDataSource = NULL;
LPWSTR pwszUserID = NULL;
LPWSTR pwszPassword = NULL;
LPWSTR pwszProviderString = L"server=MyServer;trusted_connection=yes;";

IDBInitialize * pIDBInitialize = NULL;
IDBCreateSession * pIDBCrtSess = NULL;
IOpenRowset * pIOpenRowset = NULL;
IDBCreateCommand * pIDBCrtCmd = NULL;
ICommandText * pICmdText = NULL;
IAccessor * pIAccessor = NULL;
IRowsetFastLoad * pIRowsetFastLoad = NULL;
IDBProperties * pIDBProperties = NULL;
DBBINDING rgBinding[3];
DBBINDSTATUS rgStatus[3];
ULONG ulOffset = 0;
HACCESSOR hAcc = DB_NULL_HACCESSOR;
BYTE * pData = NULL;
ULONG iRow = 0;
LPWSTR pwszTableName = L"fltest";
DBID TableID;

HRESULT hr;

#ifdef USE_ISEQSTREAM
BYTE bSrcBuf[1024]; // A buffer to hold our data for streaming
memset((void *)&bSrcBuf, 0xAB, sizeof(bSrcBuf)); // Stream data value 0xAB
MySequentialStream * pMySeqStream = new MySequentialStream();
DBOBJECT MyObject = {STGM_READ, IID_ISequentialStream}; // NULL pObject implies STGM_READ and IID_IUnknown, but not recommended
#endif

memset(rgBinding, 0, ( sizeof(rgBinding) / sizeof(rgBinding[0])) * sizeof(DBBINDING) );
TableID.eKind = DBKIND_NAME;
TableID.uName.pwszName = pwszTableName;

// Col1
rgBinding[0].iOrdinal = 1;
rgBinding[0].wType = DBTYPE_I4;
rgBinding[0].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[0].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[0].obValue = ulOffset;
ulOffset += sizeof(LONG);
rgBinding[0].cbMaxLen = sizeof(LONG);
rgBinding[0].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
rgBinding[0].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[0].dwMemOwner = DBMEMOWNER_CLIENTOWNED;

//Col2
rgBinding[1].iOrdinal = 2;
rgBinding[1].wType = DBTYPE_I4;
rgBinding[1].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[1].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[1].obValue = ulOffset;
ulOffset += sizeof(LONG);
rgBinding[1].cbMaxLen = sizeof(LONG);
rgBinding[1].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
rgBinding[1].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[1].dwMemOwner = DBMEMOWNER_CLIENTOWNED;

//Col3
rgBinding[2].iOrdinal = 3;
rgBinding[2].obStatus = ulOffset;
ulOffset+=sizeof(DBSTATUS);
rgBinding[2].obLength = ulOffset;
ulOffset+=sizeof(DBLENGTH);
rgBinding[2].obValue = ulOffset;
rgBinding[2].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH; // DBPART_LENGTH not needed for providers that don't require length
rgBinding[2].eParamIO = DBPARAMIO_NOTPARAM;
rgBinding[2].dwMemOwner = DBMEMOWNER_CLIENTOWNED;

#ifdef USE_ISEQSTREAM
rgBinding[2].wType = DBTYPE_IUNKNOWN;
ulOffset += sizeof(ISequentialStream *); // Technically should be sizeof(MySequentialStream *), but who's counting?
rgBinding[2].cbMaxLen = sizeof(ISequentialStream *);
rgBinding[2].pObject = &MyObject;
#else
rgBinding[2].wType = DBTYPE_BYTES;
ulOffset += MAX_BLOB;
rgBinding[2].cbMaxLen = MAX_BLOB;
#endif

// Set init props
for ( ULONG i = 0 ; i < sizeof(rgProperties) / sizeof(rgProperties[0]) ; i++ )
VariantInit(&rgProperties[i].vValue);

// Obtain the provider's clsid
CLSID clsidProv;
hr = CLSIDFromProgID(pwszProgID, &clsidProv);

// Get our initial connection
CoInitialize(NULL);

if (SUCCEEDED(hr))
hr = CoCreateInstance(clsidProv, NULL, CLSCTX_ALL, IID_IDBInitialize,(void **)&pIDBInitialize);

if (SUCCEEDED(hr))
hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (void **)&pIDBProperties);

// DBPROP_INIT_DATASOURCE
if(pwszDataSource)
{
rgProperties[cProperties].dwPropertyID = DBPROP_INIT_DATASOURCE;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszDataSource);
cProperties++;
}

// DBPROP_AUTH_USERID
if(pwszUserID)
{
rgProperties[cProperties].dwPropertyID = DBPROP_AUTH_USERID;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszUserID);
cProperties++;
}

// DBPROP_AUTH_PASSWORD
if(pwszPassword)
{
rgProperties[cProperties].dwPropertyID = DBPROP_AUTH_PASSWORD;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszPassword);
cProperties++;
}

// DBPROP_INIT_PROVIDERSTRING
if(pwszProviderString)
{
rgProperties[cProperties].dwPropertyID = DBPROP_INIT_PROVIDERSTRING;
rgProperties[cProperties].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[cProperties].dwStatus = DBPROPSTATUS_OK;
rgProperties[cProperties].colid = DB_NULLID;
rgProperties[cProperties].vValue.vt = VT_BSTR;
V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszProviderString);
cProperties++;
}

if(cProperties)
{
rgPropSets[cPropSets].cProperties = cProperties;
rgPropSets[cPropSets].rgProperties = rgProperties;
rgPropSets[cPropSets].guidPropertySet = DBPROPSET_DBINIT;
cPropSets++;
}

// Initialize
if (SUCCEEDED(hr))
hr = pIDBProperties->SetProperties(cPropSets, rgPropSets);
if (SUCCEEDED(hr))
hr = pIDBInitialize->Initialize();
if (SUCCEEDED(hr))
{
printf("\tConnected!\r\n");
}
else
printf("Unable to connect\r\n");

// Set fastload prop
if (SUCCEEDED(hr))
hr = SetFastLoadProperty(pIDBInitialize);

if (SUCCEEDED(hr))
hr = pIDBInitialize->QueryInterface(IID_IDBCreateSession, (void **)&pIDBCrtSess);

if (SUCCEEDED(hr))
hr = pIDBCrtSess->CreateSession(NULL, IID_IOpenRowset, (IUnknown **)&pIOpenRowset);
if (SUCCEEDED(hr))
hr = pIOpenRowset->OpenRowset(NULL, &TableID, NULL, IID_IRowsetFastLoad, 0, NULL, (IUnknown **)&pIRowsetFastLoad);

if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->QueryInterface(IID_IAccessor, (void **)&pIAccessor);

if (SUCCEEDED(hr))
hr = pIAccessor->CreateAccessor(DBACCESSOR_ROWDATA, 3, rgBinding, ulOffset, &hAcc, (DBBINDSTATUS *)&rgStatus);

if (SUCCEEDED(hr))
{
pData = (BYTE *)malloc(ulOffset);

for (iRow = 0 ; iRow < MAX_ROWS ; iRow++)
{
// Column 1 data
*(DBSTATUS *)(pData + rgBinding[0].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[0].obLength) = 1234567; // Ignored for I4 data
*(LONG *)(pData + rgBinding[0].obValue) = iRow;

// Column 2 data
*(DBSTATUS *)(pData + rgBinding[1].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[1].obLength) = 1234567; // Ignored for I4 data
*(LONG *)(pData + rgBinding[1].obValue) = iRow + 1;

// Column 3 data
*(DBSTATUS *)(pData + rgBinding[2].obStatus) = DBSTATUS_S_OK;
*(DBLENGTH *)(pData + rgBinding[2].obLength) = MAX_BLOB/(iRow + 1); // Not needed for providers that don't require length
#ifdef USE_ISEQSTREAM
// DBLENGTH is used to tell the provider how much BLOB data to expect from the stream, not required
// if provider supports sending data without length
*(ISequentialStream **)(pData+rgBinding[2].obValue) = (ISequentialStream *)pMySeqStream;
pMySeqStream->Init((void *)&bSrcBuf, sizeof(bSrcBuf), MAX_BLOB / (iRow + 1)); // Here we set the size we will let the provider read
pMySeqStream->AddRef(); // The provider releases the object, so we addref it so it doesn't get destructed
#else
memset(pData+rgBinding[2].obValue, 0, MAX_BLOB); // Not strictly necessary
memset(pData+rgBinding[2].obValue, 0x23, MAX_BLOB/(iRow + 1));
#endif
if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->InsertRow(hAcc, pData);
}
}

if (SUCCEEDED(hr))
hr = pIRowsetFastLoad->Commit(TRUE);

if (hAcc)
pIAccessor->ReleaseAccessor(hAcc, NULL);

SAFE_RELEASE(pIDBInitialize);
SAFE_RELEASE(pIDBCrtSess);
SAFE_RELEASE(pIOpenRowset);
SAFE_RELEASE(pIDBCrtCmd);
SAFE_RELEASE(pICmdText);
SAFE_RELEASE(pIAccessor);
SAFE_RELEASE(pIRowsetFastLoad);
SAFE_RELEASE(pIDBProperties);
#ifdef USE_ISEQSTREAM
SAFE_RELEASE(pMySeqStream);
#endif

if (pData)
free(pData);

CoUninitialize();
return;
}