Tips for Running the Dynamic Data System (DDS)

The Dynamic Data System (DDS) makes loading a data warehouse much easier and faster. This system uses SSIS to load standard and partitioned tables via SSIS packages created on the fly using metadata. This eliminates the need to maintain complex SSIS packages. Only basic T-SQL skills are needed. The Dynamic Data System is available on CodePlex.

My current data warehouse project uses SQL Server 2008 R2. Out of the box I found a couple minor configuration issues that prevented me from getting started with using DDS. The following notes, in addition to the documentation provided with the product, should get you on your way.

Enable xp_cmdshell and assign the following roles and permissions to your ETL user account, in this case ZUNISOFTETL:


USE master;

EXECUTE SP_CONFIGURE 'xp_cmdshell', 1;
RECONFIGURE WITH OVERRIDE;
GO

CREATE LOGIN [ZUNISOFTETL] FROM WINDOWS WITH DEFAULT_DATABASE=[DDS], DEFAULT_LANGUAGE=[us_english];
GO

EXEC master..sp_addsrvrolemember @loginame = 'ZUNISOFTETL', @rolename = 'bulkadmin';
GO

GRANT EXECUTE ON xp_cmdshell to [ZUNISOFTETL];
GO

GRANT EXECUTE ON xp_fileexist to [ZUNISOFTETL];
GO

Start SSMS as administrator, login as sa and run the following (the account should be your ETL user account):


EXEC sp_xp_cmdshell_proxy_account 'ZUNISOFTETL','<password>';
GO

Create the following folder structure:


<Drive>
   |
   |- SSIS
        |
        |- ExtractTools
        |- Packages

Make sure that the user ZUNISOFTETL has full control of the above directory structure including the Dynamic_SSIS_Creator.exe file which is placed in the ExtractTools directory.

Check that any scripts that you are using to populate the database with extract schedules, tables, servers, etc. use the directory structure defined above. DYN_Table and DYN_GlobalVariables are the only tables that contain path information.

Any destination databases will need to have the ZUNISOFTETL login added to to them with the dbo role to perform any number of operations that you define for any one extract definition. You will also need to set the following on each destination database:


ALTER DATABASE <database name> SET TRUSTWORTHY ON;
GO

I also tracked down a problem with the products’s command line package creation utility. It simply would not work with my version of SSIS. The biggest change was increasing the size of the stQuery output parameter from 4000 characters to 1,073,741,823. This eliminated issues with building queries for tables with a large number of columns. There were also some minor issues with acquiring connections and re-initializing the associated metadata. The following code can be cut and pasted to fix the package creator issues:


using System;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using System.Data;
using System.Data.OleDb;
using System.Data.SqlClient;

namespace ETL_TEST_5
{
    /// <summary>
    /// Program to create an SSIS package dynamically using the information
    /// stored in the DDS database.
    /// </summary>
    class Program
    {
        /// <summary>
        /// Application entry point.
        /// </summary>
        /// <param name="args">Commandline arguments</param>
        /// <returns></returns>
        static void Main(string[] args)
        {
            // List of variables that need to be set
            int intTableExtractID;
            string DYNDB;
            string stQuery;
            string stSourcedb;
            string stDestdb;
            string stSourceServ;
            string stDestServ;
            string stDestTable;
            string stSourceTable;
            string stOutputfile;
            string stDestSchema;
            string stSourceSchema;
            string stVerbose = "No";
            
            // Collect the required 2 command line arguements
            DYNDB = args[0];
            intTableExtractID = Convert.ToInt32(args[1]);

            // Create connection to the source database for extract info
            SqlConnection connDynDB = new SqlConnection("server=" + DYNDB + ";integrated security=true;" + "database=DDS");

            // Builds an sp call to collect the needed values
            SqlCommand cmdSSISData = new SqlCommand("DYN_SSISData", connDynDB);
            cmdSSISData.CommandType = CommandType.StoredProcedure;

            cmdSSISData.Parameters.Add("@TableExtractID", SqlDbType.Int);
            cmdSSISData.Parameters["@TableExtractID"].Value = intTableExtractID;

            cmdSSISData.Parameters.Add("@stQuery", SqlDbType.NVarChar, 1073741823);
            cmdSSISData.Parameters["@stQuery"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@stSourceServ", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@stSourceServ"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@stSourcedb", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@stSourcedb"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@stSourceSchema", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@stSourceSchema"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@stSourceTable", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@stSourceTable"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@stDestServ", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@stDestServ"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@stDestdb", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@stDestdb"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@stDestSchema", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@stDestSchema"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@stDestTable", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@stDestTable"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@stOutputfile", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@stOutputfile"].Direction = ParameterDirection.Output;

            cmdSSISData.Parameters.Add("@Verbose", SqlDbType.NVarChar, 255);
            cmdSSISData.Parameters["@Verbose"].Value = stVerbose;

            // Calls the stored proc to get our needed values.
            connDynDB.Open();

            cmdSSISData.ExecuteNonQuery();
            stQuery = (string)cmdSSISData.Parameters["@stQuery"].Value;
            stSourcedb = (string)cmdSSISData.Parameters["@stSourcedb"].Value;
            stDestdb = (string)cmdSSISData.Parameters["@stDestdb"].Value;
            stSourceServ = (string)cmdSSISData.Parameters["@stSourceServ"].Value;
            stDestServ = (string)cmdSSISData.Parameters["@stDestServ"].Value;
            stDestTable = (string)cmdSSISData.Parameters["@stDestTable"].Value;
            stSourceTable = (string)cmdSSISData.Parameters["@stSourceTable"].Value;
            stOutputfile = (string)cmdSSISData.Parameters["@stOutputfile"].Value;
            stDestSchema = (string)cmdSSISData.Parameters["@stDestSchema"].Value;
            stSourceSchema = (string)cmdSSISData.Parameters["@stSourceSchema"].Value;
            connDynDB.Close();

            Console.WriteLine("Transfering data from {0}.{1}.{2}.{3} to {4}.{5}.{6}.{7}", stSourceServ, stSourcedb, stSourceSchema, stSourceTable, stDestServ, stDestdb, stDestSchema, stDestTable);

            // Creates a new package
            Package package = new Package();

            // Add the data flow task
            Executable dataFlowTask = package.Executables.Add("STOCK:PipelineTask");

            TaskHost taskHost = dataFlowTask as TaskHost;
            taskHost.Name = "Data Flow Task";

            // We need a reference to the InnerObject to add items to the data flow
            MainPipe pipeline = taskHost.InnerObject as MainPipe;

            // Add an OLE DB connection manager to the package data source.
            ConnectionManager conMgrSource = package.Connections.Add("OLEDB");
            conMgrSource.ConnectionString = "Provider=SQLOLEDB.1;" +
              "Integrated Security=SSPI;Initial Catalog=" + stSourcedb + ";" +
              "Data Source=" + stSourceServ.Replace("[", "").Replace("]","") + ";";
            conMgrSource.Name = "SSIS Connection Source";
            conMgrSource.Description = "OLE DB Source connection to the OLTP.";
            conMgrSource.Name = "Source.Data";

            // Add an OLE DB connection manager to the package data destination.
            ConnectionManager conMgrDest = package.Connections.Add("OLEDB");
            conMgrDest.ConnectionString = "Provider=SQLOLEDB.1;" +
              "Integrated Security=SSPI;Initial Catalog=" + stDestdb + ";" +
              "Data Source=" + stDestServ + ";";
            conMgrDest.Name = "SSIS Connection Destination";
            conMgrDest.Description = "OLE DB Destination connection to the Dest.";
            conMgrDest.Name = "Destination.DB";


            // Add the source
            IDTSComponentMetaData100 srcComponent = pipeline.ComponentMetaDataCollection.New();
            srcComponent.ComponentClassID = "DTSAdapter.OleDbSource";
            srcComponent.ValidateExternalMetadata = true;
            IDTSDesigntimeComponent100 srcDesignTimeComponent = srcComponent.Instantiate();
            srcDesignTimeComponent.ProvideComponentProperties();
            srcComponent.Name = "OleDb Source";

            // Configure it to read from the given table
            srcDesignTimeComponent.SetComponentProperty("AccessMode", 2);
            srcDesignTimeComponent.SetComponentProperty("SqlCommand", stQuery);

            // Set the connection manager
            srcComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(conMgrSource);
            srcComponent.RuntimeConnectionCollection[0].ConnectionManagerID = conMgrSource.ID;

            // Retrieve the column metadata
            srcDesignTimeComponent.AcquireConnections(null);
            srcDesignTimeComponent.ReinitializeMetaData();
            srcDesignTimeComponent.ReleaseConnections();


            // Add the destination
            IDTSComponentMetaData100 destComponent = pipeline.ComponentMetaDataCollection.New();
            destComponent.ComponentClassID = "DTSAdapter.OleDbDestination";
            destComponent.ValidateExternalMetadata = true;

            IDTSDesigntimeComponent100 destDesignTimeComponent = destComponent.Instantiate();
            destDesignTimeComponent.ProvideComponentProperties();
            destComponent.Name = "OleDb Destination";

            destDesignTimeComponent.SetComponentProperty("AccessMode", 3);
            destDesignTimeComponent.SetComponentProperty("OpenRowset", stDestSchema + "." + stDestTable);

            // Set connection
            destComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(conMgrDest);
            destComponent.RuntimeConnectionCollection[0].ConnectionManagerID = conMgrDest.ID;

            // Get metadata
            destDesignTimeComponent.AcquireConnections(null);
            destDesignTimeComponent.ReinitializeMetaData();
            destDesignTimeComponent.ReleaseConnections();

            // Connect source and destination
            IDTSPath100 path = pipeline.PathCollection.New();
            path.AttachPathAndPropagateNotifications(srcComponent.OutputCollection[0], destComponent.InputCollection[0]);

            // Configure the destination
            IDTSInput100 destInput = destComponent.InputCollection[0];
            IDTSVirtualInput100 destVirInput = destInput.GetVirtualInput();
            IDTSInputColumnCollection100 destInputCols = destInput.InputColumnCollection;
            IDTSExternalMetadataColumnCollection100 destExtCols = destInput.ExternalMetadataColumnCollection;
            IDTSOutputColumnCollection100 sourceColumns = srcComponent.OutputCollection[0].OutputColumnCollection;

            // The OLEDB destination requires you to hook up the external columns
            foreach (IDTSOutputColumn100 outputCol in sourceColumns)
            {
                // Get the external column id
                IDTSExternalMetadataColumn100 extCol = (IDTSExternalMetadataColumn100)destExtCols[outputCol.Name];
                if (extCol != null)
                {
                    // Create an input column from an output col of previous component.
                    destVirInput.SetUsageType(outputCol.ID, DTSUsageType.UT_READONLY);
                    IDTSInputColumn100 inputCol = destInputCols.GetInputColumnByLineageID(outputCol.ID);
                    if (inputCol != null)
                    {
                        // Print column name for verification
                        Console.WriteLine(inputCol.Name);

                        // map the input column with an external metadata column
                        destDesignTimeComponent.MapInputColumn(destInput.ID, inputCol.ID, extCol.ID);
                    }
                }
            }

            // Saves the created package to the supplied location
            Application app = new Application();
            app.SaveToXml(stOutputfile, package, null);

            Console.WriteLine("SSIS File Location");
            Console.WriteLine(stOutputfile);

        }
    }
}

The DDS system is a good starting point for a metadata driven, dynamic SSIS ETL system. I have already added a number of my own enhancements extending it’s functionality. You should give it a try!

Please follow and like us:

2 Replies to “Tips for Running the Dynamic Data System (DDS)”

Leave a Reply

Your email address will not be published. Required fields are marked *