Friday, January 29, 2016

Efficient Distributed Queries for Oracle Data Warehouses

Efficient Distributed Queries for Oracle Data Warehouses
One of the most typical query patterns for a data warehouse is to “select” data from a remote site (i.e., a data source) and insert them into a local staging table. Also, in the majority of the cases, this statement will include some joins to local tables as well as some filter predicates on both local and remote tables. Let’s say that a typical query pattern, in its most basic form will be something like the following:

INSERT INTO <local staging table>
SELECT <columns of interest>
FROM   <remote_table>@<remote_site> t_remote
       <local_table> t_local
WHERE
       <join condition between t_remote and t_local>
       <filter predicates on t_local>

Very often, the table at the remote site has a significant number of rows. For example, a very common scenario is that the remote table is a large table containing transaction data (e.g., order lines, Call Detail Records, invoice lines etc.) with a timestamp. The goal of the distributed statement is to select only the rows that correspond to a specific time period (e.g., the transactions of the last day, a.k.a. “the daily delta”) and insert them into a local staging table. In order to achieve this, we maintain locally (i.e., at the data warehouse site) a very small “control table” that holds the current data warehouse “running date” (i.e., the date of interest for transaction data).

Therefore conceptually, we have a join of the very small (local) control table to the very large (remote) transactional table, and this join yields a relatively small amount of rows (i.e., the daily delta of transactional data, which is small, compared to the whole transactional table). Now, what is the most efficient method to execute this distributed query?

Let’s examine first, what is the wrong way to execute it. To be honest, this is the method that we see so often happening in real life, and which is a very common reason for delays in the extraction phase of ETL flows. In order to understand the concept let’s leave aside for a moment the INSERT part and just focus on the SELECT, which consists of a join of the (small) local table to the (large) remote table in order to get only the delta as an output.

Since the remote table is the large one, is not very difficult to infer that the “wrong way” is to execute this join locally, i.e., at the data warehouse site. No matter what join method will be chosen by the optimizer (e.g., NESTED LOOPS, or HASH JOIN), the end result of a locally processed join will be a very inefficient execution plan. This is simply because a local execution will either result into the travel over the network of a large data set, or into a significant amount of roundtrips caused by the consecutive probing of the large table (one probe for each row of the small table)

In Listing 1, we can see the execution of this distributed join locally, with a HASH JOIN method. Observe the “Remote SQL Information” section, which denotes the query executed at the remote site. We can see that the whole table at the remote site travels through the network unrestricted.

select /*+ leading(lcl) use_hash(rmt)  */ *
from tsmall_local lcl, tlarge@exadwhprd rmt
where
 lcl.id = rmt.id

Plan hash value: 1246216434

-------------------------------------------------------------------------------------------------------------------------------
| Id  | Operation          | Name         | E-Rows |E-Bytes| Cost (%CPU)| E-Time   | Inst   |IN-OUT|  OMem |  1Mem | Used-Mem |
-------------------------------------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT   |              |        |       | 29296 (100)|          |        |      |    |          |          |
|*  1 |  HASH JOIN         |              |     99 |   389K| 29296   (1)| 00:05:52 |        |      |   732K|   732K| 1226K (0)|
|   2 |   TABLE ACCESS FULL| TSMALL_LOCAL |     99 |   194K|    16   (0)| 00:00:01 |        |      |    |          |          |
|   3 |   REMOTE           | TLARGE       |  10000 |    19M| 29279   (1)| 00:05:52 | EXADW~ | R->S |    |          |          |
-------------------------------------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------

   1 - access("LCL"."ID"="RMT"."ID")

Remote SQL Information (identified by operation id):
----------------------------------------------------

   3 - SELECT /*+ USE_HASH ("RMT") */ "ID","DESCR" FROM "TLARGE" "RMT" (accessing 'EXADWHPRD' )
 
Listing 1: The execution of the distributed join locally causes the whole table at the remote site to travel through the network unrestricted.

In Listing 2, we can see the same join executed locally, with a NESTED LOOPS method. In this case, for each row of the driving table (at the local site) we execute at the remote site the statement seen in the “Remote SQL Information” section. In other words, for each row of the local table we probe the large table at the remote site with a specific predicate. The bind variable denotes the predicate that we send for each such probe. This might lead to a significant amount of roundtrips.

If the (local) driving table returns only a single row (e.g. the reference date, in order to get the corresponding delta of transactions from the remote side), then this execution plan could be acceptable; since there will be only one row from the driving table (i.e. a single roundtrip) and the data traveled over the network will be only the daily delta of transactions. However, it is very important that this date restriction is imposed on the remote site, otherwise the whole table (and not just the delta) will travel over the network. This “unfortunate event” can happen easily (and we have seen this in practice many times), if we use a function to get the reference date.

select *
from tsmall_local lcl, tlarge@exadwhprd rmt
where
 lcl.id = rmt.id

----------------------------------------------------------------------------------------------------
| Id  | Operation          | Name         | E-Rows |E-Bytes| Cost (%CPU)| E-Time   | Inst   |IN-OUT|
----------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT   |              |        |       |   920 (100)|          |        |      |
|   1 |  NESTED LOOPS      |              |     99 |   389K|   920   (2)| 00:00:12 |        |      |
|   2 |   TABLE ACCESS FULL| TSMALL_LOCAL |     99 |   194K|    16   (0)| 00:00:01 |        |      |
|   3 |   REMOTE           | TLARGE       |      1 |  2015 |     9   (0)| 00:00:01 | EXADW~ | R->S |
----------------------------------------------------------------------------------------------------

Remote SQL Information (identified by operation id):
----------------------------------------------------

   3 - SELECT "ID","DESCR" FROM "TLARGE" "RMT" WHERE :1="ID" (accessing 'EXADWHPRD' )
 
Listing 2: The execution of the distributed join locally with a NESTED LOOPS might result into a significant amount of roundtrips.

 In Listing 3, we have simulated this scenario by using a function to express the local column in the join predicate. Observe how different is now the plan from the one in Listing 13. Note the FILTER operation that has shown up as operation with id 3. This operation applies locally the filter that should have been executed remotely. If you check the “Remote SQL Information” section, you will see that no restriction is imposed on the large table at the remote site. The consequence of the function in the join predicate, has resulted in the worst-case scenario, since now we are probing the remote large table many times (due to the NESTED LOOPS way of working) and with each such probe, the whole large table is send over the network. This case comes from a true story, where some developer has decided to use a function in the join predicate, in order to get the DW reference date, instead of using a simple column.

select /*+ leading(lcl) use_nl(rmt) */ *
from tsmall_local lcl, tlarge@loopback rmt
where
 myfunc(lcl.id) = rmt.id

---------------------------------------------------------------------------------------------------
| Id  | Operation          | Name         | Rows  | Bytes | Cost (%CPU)| Time     | Inst   |IN-OUT|
---------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT   |              |  4171 |    16M| 89666   (2)| 00:17:56 |        |      |
|   1 |  NESTED LOOPS      |              |  4171 |    16M| 89666   (2)| 00:17:56 |        |      |
|   2 |   TABLE ACCESS FULL| TSMALL_LOCAL |    99 |   195K|    17   (0)| 00:00:01 |        |      |
|*  3 |   FILTER           |              |    42 | 84630 |   900   (1)| 00:00:11 |        |      |
|   4 |    REMOTE          | TLARGE       |       |       |            |          | LOOPB~ | R->S |
---------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------

   3 - filter("RMT"."ID"="MYFUNC"("LCL"."ID"))

Remote SQL Information (identified by operation id):
----------------------------------------------------

   4 - SELECT /*+ USE_NL ("RMT") */ "ID","DESCR" FROM "TLARGE" "RMT" (accessing 'LOOPBACK')
 
Listing 3: The presence of a function in the join predicate has resulted into a remote query with no restrictions.
We have discussed the consequences of executing the distributed join locally (i.e., at the DW side). Clearly, the most efficient plan is the one, where the small local table is sent over to the remote site and the join is executed remotely (i.e., at the data source and not at the data warehouse). This can be easily achieved with the use of the DRIVING_SITE hint, with which we instruct the optimizer at what site we want the execution to take place. This is depicted in Listing 4.

select /*+ driving_site(rmt)   */ *
from tsmall_local lcl, tlarge@exadwhprd rmt
where
 lcl.id = rmt.id 

-----------------------------------------------------------------------------------------------------------------------------
| Id  | Operation                       | Name         | Rows  | Bytes | Cost (%CPU)| Time     | TQ/Ins |IN-OUT| PQ Distrib |
-----------------------------------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT REMOTE         |              |  4411 |    17M|  9055   (1)| 00:00:01 |     |      |       |
|   1 |  PX COORDINATOR                 |              |       |       |            |          |     |      |       |
|   2 |   PX SEND QC (RANDOM)           | :TQ10002     |  4411 |    17M|  9055   (1)| 00:00:01 |  Q1,02 | P->S | QC (RAND)  |
|*  3 |    HASH JOIN BUFFERED           |              |  4411 |    17M|  9055   (1)| 00:00:01 |  Q1,02 | PCWP |            |
|   4 |     BUFFER SORT                 |              |       |       |            |          |  Q1,02 | PCWC |            |
|   5 |      PX RECEIVE                 |              |  4411 |  8679K|    11   (0)| 00:00:01 |  Q1,02 | PCWP |            |
|   6 |       PX SEND HASH              | :TQ10000     |  4411 |  8679K|    11   (0)| 00:00:01 | DWHPRD | S->P | HASH       |
|   7 |        REMOTE                   | TSMALL_LOCAL |  4411 |  8679K|    11   (0)| 00:00:01 |      ! | R->S |            |
|   8 |     PX RECEIVE                  |              | 10000 |    19M|  9043   (0)| 00:00:01 |  Q1,02 | PCWP |            |
|   9 |      PX SEND HASH               | :TQ10001     | 10000 |    19M|  9043   (0)| 00:00:01 |  Q1,01 | P->P | HASH       |
|  10 |       PX BLOCK ITERATOR         |              | 10000 |    19M|  9043   (0)| 00:00:01 |  Q1,01 | PCWC |            |
|  11 |        TABLE ACCESS STORAGE FULL| TLARGE       | 10000 |    19M|  9043   (0)| 00:00:01 |  Q1,01 | PCWP |            |
-----------------------------------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------

   3 - access("A2"."ID"="A1"."ID")

Remote SQL Information (identified by operation id):
----------------------------------------------------

   7 - SELECT "ID","DESCR" FROM "TSMALL_LOCAL" "A2" (accessing '!' )
 
Listing 4: With the use of the DRIVING_SITE hint the join is executed at the remote site.
From the “Remote SQL Information” section we can see that the remote site is the data warehouse and the remote query is a SELECT on the small table. This is the most efficient execution plan for our distributed query.

Unfortunately, if the same distributed query is included inside an INSERT INTO SELECT statement, or a CTAS statement (Create Table As Select) then the optimizer, for no obvious reason, chooses to ignore the DRIVING_SITE hint and execute the SELECT part locally. This limits the usefulness of the DRIVING_SITE hint only to distributed queries. However, there is a workaround to this limitation (according to this post from Jonathan Lewis), by wrapping the SELECT part inside a pipelined table function. Other options are, to execute the staging of the delta at the remote site and then send that over to the DW site. Nevertheless, even if the join is executed locally, you have to guarantee that a filter predicate is applied at the remote site, as in Listing 2, so as a restricted set of rows travels over the network.

In Listing 5, we can clearly see that in an INSERT INTO SELECT statement, the DRIVING_SITE hint is ignored and the nested loops join is executed locally at the Data Warehouse site.
insert into testrmt select /*+ driving_site(rmt)  */ lcl.id, rmt.descr
from tsmall_local lcl, tlarge@exadwhprd rmt where  lcl.id = rmt.id

Plan hash value: 2127595353

----------------------------------------------------------------------------------------------------------
| Id  | Operation                | Name         | E-Rows |E-Bytes| Cost (%CPU)| E-Time   | Inst   |IN-OUT|
----------------------------------------------------------------------------------------------------------
|   0 | INSERT STATEMENT         |              |        |       |   920 (100)|          |        |   |
|   1 |  LOAD TABLE CONVENTIONAL |              |        |       |            |          |        |   |
|   2 |   NESTED LOOPS           |              |     99 |   196K|   920   (2)| 00:00:12 |        |   |
|   3 |    TABLE ACCESS FULL     | TSMALL_LOCAL |     99 |  1287 |    16   (0)| 00:00:01 |        |   |
|   4 |    REMOTE                | TLARGE       |      1 |  2015 |     9   (0)| 00:00:01 | EXADW~ | R->S |
----------------------------------------------------------------------------------------------------------

Query Block Name / Object Alias (identified by operation id):
-------------------------------------------------------------

   1 - SEL$1
   3 - SEL$1 / LCL@SEL$1
   4 - SEL$1 / RMT@SEL$1

Remote SQL Information (identified by operation id):
----------------------------------------------------

   4 - SELECT /*+ OPAQUE_TRANSFORM */ "ID","DESCR" FROM "TLARGE" "RMT" WHERE :1="ID" (accessing
       'EXADWHPRD' )
 
Listing 5: In an INSERT INTO SELECT statement the DRIVING_SITE hint is ignored.
Lets implement the workaround with the pipelined table function. The select statement we want to wrap in a pipelined table function is the following:

select /*+ driving_site(rmt)  */ lcl.id, rmt.descr
from tsmall_local lcl, tlarge@exadwhprd rmt
where
lcl.id = rmt.id

So the distributed insert will become:

insert into testrmt
select *
from   table(remdml_pkg.remsel(  CURSOR (
   select /*+ driving_site(rmt)  */ 
      lcl.id, rmt.descr
        from tsmall_local lcl, tlarge@exadwhprd rmt
         where
lcl.id = rmt.id
)));

where remsel is the pipelined table function and here is how it is defined:

CREATE OR REPLACE PACKAGE remdml_pkg IS
  TYPE refcur_t IS REF CURSOR RETURN tsmall_local%ROWTYPE;
  TYPE outrec_typ IS RECORD (
    id    NUMBER(22),
    descr  VARCHAR2(4000)
  );
  TYPE outrecset IS TABLE OF outrec_typ;
  FUNCTION remsel (p refcur_t) RETURN outrecset PIPELINED;
END remdml_pkg;
/

CREATE OR REPLACE PACKAGE BODY remdml_pkg IS
  FUNCTION remsel (p refcur_t) RETURN outrecset PIPELINED IS
    out_rec outrec_typ;
    in_rec  p%ROWTYPE;
  BEGIN
    LOOP
      FETCH p INTO in_rec;
      EXIT WHEN p%NOTFOUND;
      -- first row
      out_rec.id := in_rec.id;
      out_rec.descr := in_rec.descr;
      PIPE ROW(out_rec);
    END LOOP;
    CLOSE p;
    RETURN;
  END remsel;
END remdml_pkg;

No comments:

Post a Comment