One of the most typical query patterns for a data
warehouse is to “select” data from a remote site (i.e., a data source) and
insert them into a local staging table. Also, in the majority of the cases,
this statement will include some joins to local tables as well as some filter
predicates on both local and remote tables. Let’s say that a typical query
pattern, in its most basic form will be something like the following:
INSERT INTO <local staging table>
SELECT <columns of interest>
FROM <remote_table>@<remote_site>
t_remote
<local_table> t_local
WHERE
<join
condition between t_remote and t_local>
<filter
predicates on t_local>
Very often, the table at the remote site has a
significant number of rows. For example, a very common scenario is that the
remote table is a large table containing transaction data (e.g., order lines,
Call Detail Records, invoice lines etc.) with a timestamp. The goal of the
distributed statement is to select only the rows that correspond to a specific
time period (e.g., the transactions of the last day, a.k.a. “the daily delta”)
and insert them into a local staging table. In order to achieve this, we
maintain locally (i.e., at the data warehouse site) a very small “control
table” that holds the current data warehouse “running date” (i.e., the date of
interest for transaction data).
Therefore conceptually, we have a join of the very small
(local) control table to the very large (remote) transactional table, and this
join yields a relatively small amount of rows (i.e., the daily delta of
transactional data, which is small, compared to the whole transactional table).
Now, what is the most efficient method to execute this distributed query?
Let’s examine first, what is the wrong way to execute it. To be honest, this is the method that we
see so often happening in real life, and which is a very common reason for
delays in the extraction phase of ETL flows. In order to understand the concept
let’s leave aside for a moment the INSERT part and just focus on the SELECT,
which consists of a join of the (small) local table to the (large) remote table
in order to get only the delta as an output.
Since
the remote table is the large one, is not very difficult to infer that the
“wrong way” is to execute this join locally,
i.e., at the data warehouse site. No matter what join method will be chosen by
the optimizer (e.g., NESTED LOOPS, or HASH JOIN), the end result of a locally
processed join will be a very inefficient execution plan. This is simply
because a local execution will either result into the travel over the network
of a large data set, or into a significant amount of roundtrips caused by the
consecutive probing of the large table (one probe for each row of the small
table)
In Listing 1, we can see the execution of this distributed join locally, with a HASH JOIN
method. Observe the “Remote SQL Information” section, which denotes the query
executed at the remote site. We can see that the whole table at the remote site
travels through the network unrestricted.
select /*+ leading(lcl) use_hash(rmt) */ *
from tsmall_local lcl, tlarge@exadwhprd rmt
where
lcl.id = rmt.id
Plan hash value: 1246216434
-------------------------------------------------------------------------------------------------------------------------------
| Id | Operation | Name | E-Rows |E-Bytes| Cost (%CPU)| E-Time | Inst |IN-OUT| OMem | 1Mem | Used-Mem |
-------------------------------------------------------------------------------------------------------------------------------
| 0 | SELECT STATEMENT | | | | 29296 (100)| | | | | | |
|* 1 | HASH JOIN | | 99 | 389K| 29296 (1)| 00:05:52 | | | 732K| 732K| 1226K (0)|
| 2 | TABLE ACCESS FULL| TSMALL_LOCAL | 99 | 194K| 16 (0)| 00:00:01 | | | | | |
| 3 | REMOTE | TLARGE | 10000 | 19M| 29279 (1)| 00:05:52 | EXADW~ | R->S | | | |
-------------------------------------------------------------------------------------------------------------------------------
Predicate Information (identified by operation id):
---------------------------------------------------
1 - access("LCL"."ID"="RMT"."ID")
Remote SQL Information (identified by operation id):
----------------------------------------------------
3 - SELECT /*+ USE_HASH ("RMT") */ "ID","DESCR" FROM "TLARGE" "RMT" (accessing 'EXADWHPRD' )
Listing 1: The
execution of the distributed join locally causes the whole table at the remote
site to travel through the network unrestricted.
In Listing 2,
we can see the same join executed locally, with a NESTED LOOPS method. In this
case, for each row of the driving table (at the local site) we execute at the
remote site the statement seen in the “Remote SQL Information” section. In
other words, for each row of the local table we probe the large table at the
remote site with a specific predicate. The bind variable denotes the predicate
that we send for each such probe. This might lead to a significant amount of
roundtrips.
select *
from tsmall_local lcl, tlarge@exadwhprd rmt
where
lcl.id = rmt.id
----------------------------------------------------------------------------------------------------
| Id | Operation | Name | E-Rows |E-Bytes| Cost (%CPU)| E-Time | Inst |IN-OUT|
----------------------------------------------------------------------------------------------------
| 0 | SELECT STATEMENT | | | | 920 (100)| | | |
| 1 | NESTED LOOPS | | 99 | 389K| 920 (2)| 00:00:12 | | |
| 2 | TABLE ACCESS FULL| TSMALL_LOCAL | 99 | 194K| 16 (0)| 00:00:01 | | |
| 3 | REMOTE | TLARGE | 1 | 2015 | 9 (0)| 00:00:01 | EXADW~ | R->S |
----------------------------------------------------------------------------------------------------
Remote SQL Information (identified by operation id):
----------------------------------------------------
3 - SELECT "ID","DESCR" FROM "TLARGE" "RMT" WHERE :1="ID" (accessing 'EXADWHPRD' )
Listing 2: The
execution of the distributed join locally with a NESTED LOOPS might result into
a significant amount of roundtrips.
In Listing 3, we have simulated this scenario by using a function to express the local column
in the join predicate. Observe how different is now the plan from the one in Listing 13.
Note the FILTER operation that has shown up as operation with id 3. This
operation applies locally the filter that should have been executed remotely.
If you check the “Remote SQL Information” section, you will see that no
restriction is imposed on the large table at the remote site. The consequence
of the function in the join predicate, has resulted in the worst-case scenario,
since now we are probing the remote large table many times (due to the NESTED
LOOPS way of working) and with each such probe, the whole large table is send
over the network. This case comes from a true story, where some developer has
decided to use a function in the join predicate, in order to get the DW
reference date, instead of using a simple column.
select /*+ leading(lcl) use_nl(rmt) */ *
from tsmall_local lcl, tlarge@loopback rmt
where
myfunc(lcl.id) = rmt.id
---------------------------------------------------------------------------------------------------
| Id | Operation | Name | Rows | Bytes | Cost (%CPU)| Time | Inst |IN-OUT|
---------------------------------------------------------------------------------------------------
| 0 | SELECT STATEMENT | | 4171 | 16M| 89666 (2)| 00:17:56 | | |
| 1 | NESTED LOOPS | | 4171 | 16M| 89666 (2)| 00:17:56 | | |
| 2 | TABLE ACCESS FULL| TSMALL_LOCAL | 99 | 195K| 17 (0)| 00:00:01 | | |
|* 3 | FILTER | | 42 | 84630 | 900 (1)| 00:00:11 | | |
| 4 | REMOTE | TLARGE | | | | | LOOPB~ | R->S |
---------------------------------------------------------------------------------------------------
Predicate Information (identified by operation id):
---------------------------------------------------
3 - filter("RMT"."ID"="MYFUNC"("LCL"."ID"))
Remote SQL Information (identified by operation id):
----------------------------------------------------
4 - SELECT /*+ USE_NL ("RMT") */ "ID","DESCR" FROM "TLARGE" "RMT" (accessing 'LOOPBACK')
Listing 3: The
presence of a function in the join predicate has resulted into a remote query
with no restrictions.
We
have discussed the consequences of executing the distributed join locally
(i.e., at the DW side). Clearly, the most efficient plan is the one, where the
small local table is sent over to the remote site and the join is executed
remotely (i.e., at the data source and not at the data warehouse). This can be
easily achieved with the use of the DRIVING_SITE
hint, with which we instruct the optimizer at what site we want the
execution to take place. This is depicted in Listing 4.
select /*+ driving_site(rmt) */ *
from tsmall_local lcl, tlarge@exadwhprd rmt
where
lcl.id = rmt.id
-----------------------------------------------------------------------------------------------------------------------------
| Id | Operation | Name | Rows | Bytes | Cost (%CPU)| Time | TQ/Ins |IN-OUT| PQ Distrib |
-----------------------------------------------------------------------------------------------------------------------------
| 0 | SELECT STATEMENT REMOTE | | 4411 | 17M| 9055 (1)| 00:00:01 | | | |
| 1 | PX COORDINATOR | | | | | | | | |
| 2 | PX SEND QC (RANDOM) | :TQ10002 | 4411 | 17M| 9055 (1)| 00:00:01 | Q1,02 | P->S | QC (RAND) |
|* 3 | HASH JOIN BUFFERED | | 4411 | 17M| 9055 (1)| 00:00:01 | Q1,02 | PCWP | |
| 4 | BUFFER SORT | | | | | | Q1,02 | PCWC | |
| 5 | PX RECEIVE | | 4411 | 8679K| 11 (0)| 00:00:01 | Q1,02 | PCWP | |
| 6 | PX SEND HASH | :TQ10000 | 4411 | 8679K| 11 (0)| 00:00:01 | DWHPRD | S->P | HASH |
| 7 | REMOTE | TSMALL_LOCAL | 4411 | 8679K| 11 (0)| 00:00:01 | ! | R->S | |
| 8 | PX RECEIVE | | 10000 | 19M| 9043 (0)| 00:00:01 | Q1,02 | PCWP | |
| 9 | PX SEND HASH | :TQ10001 | 10000 | 19M| 9043 (0)| 00:00:01 | Q1,01 | P->P | HASH |
| 10 | PX BLOCK ITERATOR | | 10000 | 19M| 9043 (0)| 00:00:01 | Q1,01 | PCWC | |
| 11 | TABLE ACCESS STORAGE FULL| TLARGE | 10000 | 19M| 9043 (0)| 00:00:01 | Q1,01 | PCWP | |
-----------------------------------------------------------------------------------------------------------------------------
Predicate Information (identified by operation id):
---------------------------------------------------
3 - access("A2"."ID"="A1"."ID")
Remote SQL Information (identified by operation id):
----------------------------------------------------
7 - SELECT "ID","DESCR" FROM "TSMALL_LOCAL" "A2" (accessing '!' )
Listing 4: With
the use of the DRIVING_SITE hint the join is executed at the remote site.
From
the “Remote SQL Information” section we can see that the remote site is the
data warehouse and the remote query is a SELECT on the small table. This is the
most efficient execution plan for our distributed query.
Unfortunately, if the same distributed query is included
inside an INSERT INTO SELECT statement, or a CTAS statement (Create Table As
Select) then the optimizer, for no obvious reason, chooses to ignore the
DRIVING_SITE hint and execute the SELECT part locally. This limits the
usefulness of the DRIVING_SITE hint only to distributed queries. However, there
is a workaround to this limitation (according to this post from Jonathan Lewis), by wrapping the SELECT part
inside a pipelined table function. Other options are, to execute the staging of
the delta at the remote site and then send that over to the DW site.
Nevertheless, even if the join is executed locally, you have to guarantee that
a filter predicate is applied at the remote site, as in Listing 2,
so as a restricted set of rows travels over the network.
In Listing 5, we can clearly see that in an INSERT INTO SELECT statement, the DRIVING_SITE hint is ignored and the nested loops join is executed locally at the Data Warehouse site.
insert into testrmt select /*+ driving_site(rmt) */ lcl.id, rmt.descr
from tsmall_local lcl, tlarge@exadwhprd rmt where lcl.id = rmt.id
Plan hash value: 2127595353
----------------------------------------------------------------------------------------------------------
| Id | Operation | Name | E-Rows |E-Bytes| Cost (%CPU)| E-Time | Inst |IN-OUT|
----------------------------------------------------------------------------------------------------------
| 0 | INSERT STATEMENT | | | | 920 (100)| | | |
| 1 | LOAD TABLE CONVENTIONAL | | | | | | | |
| 2 | NESTED LOOPS | | 99 | 196K| 920 (2)| 00:00:12 | | |
| 3 | TABLE ACCESS FULL | TSMALL_LOCAL | 99 | 1287 | 16 (0)| 00:00:01 | | |
| 4 | REMOTE | TLARGE | 1 | 2015 | 9 (0)| 00:00:01 | EXADW~ | R->S |
----------------------------------------------------------------------------------------------------------
Query Block Name / Object Alias (identified by operation id):
-------------------------------------------------------------
1 - SEL$1
3 - SEL$1 / LCL@SEL$1
4 - SEL$1 / RMT@SEL$1
Remote SQL Information (identified by operation id):
----------------------------------------------------
4 - SELECT /*+ OPAQUE_TRANSFORM */ "ID","DESCR" FROM "TLARGE" "RMT" WHERE :1="ID" (accessing
'EXADWHPRD' )
Listing 5:
In an INSERT INTO SELECT statement the DRIVING_SITE hint is ignored.
Lets implement the workaround with the pipelined table function. The select statement we want to wrap in a pipelined table function is the following:
select /*+ driving_site(rmt) */ lcl.id, rmt.descr
from tsmall_local lcl, tlarge@exadwhprd rmt
where
lcl.id = rmt.id
So the distributed insert will become:
insert into testrmt
select *
from table(remdml_pkg.remsel( CURSOR (
select /*+ driving_site(rmt) */
lcl.id, rmt.descr
from tsmall_local lcl, tlarge@exadwhprd rmt
where
lcl.id = rmt.id
)));
where remsel is the pipelined table function and here is how it is defined:
CREATE OR REPLACE PACKAGE remdml_pkg IS
TYPE refcur_t IS REF CURSOR RETURN tsmall_local%ROWTYPE;
TYPE outrec_typ IS RECORD (
id NUMBER(22),
descr VARCHAR2(4000)
);
TYPE outrecset IS TABLE OF outrec_typ;
FUNCTION remsel (p refcur_t) RETURN outrecset PIPELINED;
END remdml_pkg;
/
CREATE OR REPLACE PACKAGE BODY remdml_pkg IS
FUNCTION remsel (p refcur_t) RETURN outrecset PIPELINED IS
out_rec outrec_typ;
in_rec p%ROWTYPE;
BEGIN
LOOP
FETCH p INTO in_rec;
EXIT WHEN p%NOTFOUND;
-- first row
out_rec.id := in_rec.id;
out_rec.descr := in_rec.descr;
PIPE ROW(out_rec);
END LOOP;
CLOSE p;
RETURN;
END remsel;
END remdml_pkg;
select /*+ driving_site(rmt) */ lcl.id, rmt.descr
from tsmall_local lcl, tlarge@exadwhprd rmt
where
lcl.id = rmt.id
So the distributed insert will become:
insert into testrmt
select *
from table(remdml_pkg.remsel( CURSOR (
select /*+ driving_site(rmt) */
lcl.id, rmt.descr
from tsmall_local lcl, tlarge@exadwhprd rmt
where
lcl.id = rmt.id
)));
where remsel is the pipelined table function and here is how it is defined:
CREATE OR REPLACE PACKAGE remdml_pkg IS
TYPE refcur_t IS REF CURSOR RETURN tsmall_local%ROWTYPE;
TYPE outrec_typ IS RECORD (
id NUMBER(22),
descr VARCHAR2(4000)
);
TYPE outrecset IS TABLE OF outrec_typ;
FUNCTION remsel (p refcur_t) RETURN outrecset PIPELINED;
END remdml_pkg;
/
CREATE OR REPLACE PACKAGE BODY remdml_pkg IS
FUNCTION remsel (p refcur_t) RETURN outrecset PIPELINED IS
out_rec outrec_typ;
in_rec p%ROWTYPE;
BEGIN
LOOP
FETCH p INTO in_rec;
EXIT WHEN p%NOTFOUND;
-- first row
out_rec.id := in_rec.id;
out_rec.descr := in_rec.descr;
PIPE ROW(out_rec);
END LOOP;
CLOSE p;
RETURN;
END remsel;
END remdml_pkg;
No comments:
Post a Comment