databus原理,databus( 二 )


描述:维护一个databus源的持久映射为一个bit数字 。
定义在:database/*/createSchema/schema/cdsddl.tab支持最大数字为126个源,因为源的位掩码必须是一个数字 。

描述
name
Databus源的名字
bitnum
在sy$txlog表mask列与这个源相对应的bit数字约束:
sy$sources_pk - name的主键约束,在database/*/createSchema/schema/cdsddl.con中声明 。
索引:
sy$sources_I1 - bitnum的辅助索引,在database/*/createSchema/schema/cdsddl.ind中声明 。
sy$txlog表:
描述:databus源的更新日志 。这个表被源表的insert/update触发器更新 。
定义在:database/*/createSchema/schema/cdsddl.tab其他
启用 rowdependencies

描述
txn
事务id(通过sy$scn_req生成)
scn
系统变化号,事务第一个变化的ora_rowscn
mask
在事务更新中获取的源掩码;查看sy$sources
ts
事务的时间戳
约束:
sy$txlog_pk - txn列上的主键约束,声明在:database/*/createSchema/schema/cdsddl.con索引:
sy$txlog_I1 - scn列上的辅助索引;声明在:database/*/createSchema/schema/cdsddl.indsync_core_settings表:
描述:维护databus的设置;
定义在:database/*/createSchema/schema/cdsddl.tab列
描述
RAISE_DBMS_ALERTS
如果是'N',禁用signal_beep的有条件的警报
Databus 源视图:
描述:定义一个表的视图通过databus暴露表.
定义在:database/*/createSchema/schema/cdsddl.vwcreate or replace force view sy$T as
select
txn,
f1,
f2,
:
:
from
T
存储过程
sync_core 包:
描述:databus核心方法 。
定义在:database/*/createSchema/schema/cdsddl.prc1包变量:
变量名
描述
lastTxID
最新事务ID,DBMS_TRANSACTION.LOCAL_TRANSACTION_ID,在getTxn()中用于确定事务边界 。
currentTxn
sy$txlog表最后一个更新/添加的记录的txn列的值 。
currentMask
sy$txlog表最后一个更新/添加的记录的mask列的值 。
source_bits
从databus源名字映射到一个bitmask(位掩码)
包方法:
方法原型
描述
function getScn(v_scn in number, v_ora_rowscn in number) return numberImplements infinity == v_scn ? v_ora_rowscn : v_scnfunction getMask(source in varchar) return number返回databus源名字对应的bitmask(位掩码)
function getTxn(source in varchar) return number表示source正在被一个事务更新 。相应的更新sy$txlog 。返回事务的txn 。
procedure coalesce_log
更新sy$txlog表中所有没有设置相应的ora_rowscn值的所有记录 。这是需要的,因为ora_rowscn是在事务提交的时候生成的,换句话说,在sy$txlog表的记录添加后生成 。
procedure signal_beep
提出了一个sy$alert警告,如果sync_core_settings表的RAISE_DBMS_ALERTS设置为'Y' 。
unconditional_signal_beep
提出一个sy$alert警告 。
sync_alert包:
描述:管理来自databus源警报的日常工作 。
定义在:database/*/createSchema/schema/cdsddl.prc 。
包变量:
变量名字
描述
is_registered
Boolean标记:sy$alert警报是否注册
包方法:
方法原型
描述
function registerSourceWithVersion(source in varchar, version in number) return number注册一个带版本号的源:注册之后,所有发生在这个源上的事件都被waitForEvent返回;如果源不存在返回null(否则返回源) 。
procedure unregisterAllSources
不注册所有源,访问这个之后,时间不再被waitForEvent返回 。
function waitForEvent(maxWait in number) return varchar等待一个时间不超过指定的时间(秒) 。返回与时间相关的消息 。
Jobs
J_COALESCE_LOG
描述:每两秒运行一次sync_core.coalesce_log 。
定义在:database/*/createSchema/schema/cdsddl.prcJ_CALL_SIGNAL
描述:每秒调用一次sync_core.unconditional_signal_beep定义在:database/*/createSchema/schema/cdsddl.prc触发器
T是databus源的名字(sy$sources的name)
CREATE TRIGGER T_DATABUS_TRG
before insert or update on T
referencing old as old new as new
for each row
begin
if (updating and :new.txn < 0) then :new.txn := -:new.txn; else :new.txn := sync_core.getTxn('T'); end if; end;Pull Queries (拉取查询)
定期查询
配置设置:chunkedScnThreshold == -1, useRowChunking = N/A查询参数
sinceScn- 从relay/client读取的最后的SCN 。
SELECT /*+ first_rows LEADING(tx) */ sync_core.getScn(tx.scn, tx.ora_rowscn) scn, tx.ts event_timestamp, src.*FROM sy$src, sy$txlog tx
WHERE src.txn=tx.txn AND tx.scn > :sinceScn AND tx.ora_rowscn > :sinceScn行分块查询
配置设置:chunkedScnThreshold > 0, useRowChunking = true查询参数
sinceScn- 从relay/client读取的最后的SCN 。
rowsPerChunk - 数据块大小(在sy$txlog中的行)SELECT scn, event_timestamp, src.*
FROM sy$src,
( SELECT /*+ first_rows LEADING(tx) */ sync_core.getScn(tx.scn, tx.ora_rowscn) scn, tx.ts event_timestamp, tx.txn, row_number() OVER (ORDER BY TX.SCN) rFROM sy$txlog tx ");

推荐阅读