Maximizing throughput on bulk data processing

Posted on

Question :

I’m using SQL Serverversion 14.0.3035.2

I have a pile of information that I need to process from table A and insert the modified data into table B. I have written an application to query the data, do some binary manipulation, and insert the results into the target DB.

I am convinced the slowdown is during the read.

When I first started running the application, It was pretty speedy. Towards the second half of the execution it’s slowed to a crawl. The difference is the number of rows with the bit IsFetched = 1 in the Source and the number of rows in the target table

Both tables share a drive for the Data files, and also share a drive for the Log files (Dedicated disk for log, dedicated disk for data)

In order to maximize my throughput, I’ve made the application run batches in parallel. My query to get the data and mark it as ‘touched’ looks like this

UPDATE TOP(100) _s
            SET IsFetched = 1
            FROM Source _s
            where _s.IsFetched = 0

The table I’m reading from has the schema

CREATE TABLE [dbo].[Source](
[SourceID] [int] NOT NULL,
[BinaryData] [varbinary](max) NULL,
[IsFetched] [bit] NOT NULL,

    [SourceID] ASC

ALTER TABLE [dbo].[Source] ADD  CONSTRAINT [DF_Source_IsFetched]  DEFAULT ((0)) FOR [IsFetched]

I have a non-clustered index on IsFetched.

CREATE NONCLUSTERED INDEX [Idx_Fetched] ON [dbo].[---]
    [IsFetched] ASC

I’m writing to a table with the schema

CREATE TABLE [dbo].[Target](
[SourceId] [int] NOT NULL,
[BinaryData] [varbinary](max) NULL

As expected, there are locks on fetching the data. However, it seems like it is taking much longer than necessary to get the data. I’ve used this pattern before and achieved a much higher throughput (10’s of thousands of rows per second). Right now I’m getting a maximum throughput of about 200-300 rows per second.The binary data isn’t that big so I don’t think it’s a matter of reading too much data at once.

I’ve found changing the degree of parallelism and batch size doesn’t do much to increase the speed but the fastest I can get is about 20 degrees of parallelism with 10 rows per transaction.

My table doesn’t seem to be too fragmented but my disk seems to be the culprit


Answer :

I should think your workload is not using the index on IsFetched at all. Each successive UPDATE will be starting at the beginning of the table and reading forward until it has processed 100 rows (your TOP value). Subsequent iterations have to read over the previously-fetched rows to get to new ones. Conceptually, the first iteration reads 100 new rows and updates them. The second iteration reads the first 100 rows and skips them then reads & processes 100 new rows. The third reads those 200 rows, skips them, read 100 new rows and process them, and so on.

Why does the query not use the index? The query execution plan is generated when the query is first submitted. At that point all rows in the table have IsFetched = 0. So using the index would not help distinguish which rows to process and which to skip. Indeed using that index would be an overhead since execution would have to read other structures for the other columns’ values. The optimizer has no way of accounting for the fact that this UPDATE will be run many times in succession.

The solution is to use a different architecture based on SourceID, which is unique (and clustered). On each iteration store the highest value of SourceID that was processed, then start the next iteration from that value.

First some set-up and test data

drop table if exists Source;
drop table if exists Target;

create table Source(SourceID int primary key);
create table Target(SourceID int primary key);

insert Source(SourceID) values (1), (2), (3), (4), (5), (6), (7), (8), (9);

I’ll show the code to iterate then describe it after.

declare @batch int = 4;      -- for 9 test rows this gives me 2 full + 1 partial batches
declare @highest int = -1;   -- set to any value lower than what is in your data
declare @var table (SourceID int primary key clustered);
declare @c int = @batch;     -- essentially a flag showing if the previous
                             -- iteration found rows

while @c >= @batch    -- stop once a not-full batch is processed as that
                      -- marks the end of the data
    print 'Highest: ' + cast(@highest as varchar(99));  -- debug

    insert top (@batch)
    into target(SourceID)
        into @var
    from Source
    where SourceID > @highest
    order by SourceID;    -- important to order so each iteration is guaranteed to get
                          -- a contiguous block

    set @c = @@ROWCOUNT;  -- flag whether to iterate

    -- select * from Target;    -- interesting for debug, do NOT use in production!

    set @highest = (select max(SourceID) from @var);
end    -- end iteration

The output messages are

Highest: -1              -- this is the start with the dummy initial value

(4 rows affected)        -- we find rows 1, 2, 3 & 4 and process them
Highest: 4               -- start the second iteration from "> 4"

(4 rows affected)        -- rows 5, 6, 7 & 8 done
Highest: 8               -- third iteration from "> 8"

(1 row affected)         -- only row 9 left.

There’s an assumption that there is an index on the key column (SourceID). Each iteration performs and index seek to where the last left off and then scans leaf pages from there, reading @batch rows. Therefore it obviously works best if the index is covering or is the clustered index, which is covering by definition. Without an index this will revert to table scans and you’re no better than you were before.

I put an index on @var. Inserts will be in slustered sequence so there will be no page splits. The MAX() query will be a single-row lookup, which is nice. If you can a non-durable in-memory table works nicely in this role, too.

I make @batch a variable for ease of debug. It could be hard-coded.

Variable @c is really a flag. Defining as an integer and setting it as I do avoids unnecessary IF statements.

It is important to have an ORDER BY on the SELECT. Without it we are not guaranteed that the rows returned are adjacent. For example, given the data above and @highest = -1, rows 3, 5, 7 & 9 satisfy the WHERE clause, which is not what we desire.

There is no requirement for SourceID values to be contiguous. If there are holes in the sequence, if data has been deleted, this is OK.

I stop once an iteration processes fewer than the desired number of rows.

Note that IsFetched is not required for this. It can be dropped from the schema if this is its only purpose.

There’s a nice write-up here including benchmarking over a large-ish table. Swart’s solution reads ahead to find the end of a batch then processes the batch in a second statement. Mine determines the end of a batch retroactively after a batch is processed. His scans the source table twice and mine once plus an index lookup. His solution will always perform a “no-op” iteration at the end of the table, mine when the table cardinality is an exact multiple of the batch size. I think mine is slightly more efficient but you’re unlikely to notice in a real workload.

If the table source is continuously populated with new rows and not truncated between runs you can save the value of @highest to a table at the end of a run and populate it from that table during the next run. If future runs can insert SourceID values lower than the current largest, or if there are concurrent writes, the algorithm can be adjusted to accommodate.

My experience is that batch sizes of a few thousand rows give the best balance between throughput, blocking and resource consumption. Test on your environment, of course, to see what works for you.

Leave a Reply

Your email address will not be published. Required fields are marked *