use strict;
use warnings;
use Test::Exception;
use Test::More;
use DBIx::Class::Optional::Dependencies ();
use lib qw(t/lib);
use DBICTest::RunMode;
use DBIC::SqlMakerTest;
use DBIx::Class::SQLMaker::LimitDialects;
my $ROWS = DBIx::Class::SQLMaker::LimitDialects->__rows_bindtype,
my $TOTAL = DBIx::Class::SQLMaker::LimitDialects->__total_bindtype,
$ENV{NLS_SORT} = "BINARY";
$ENV{NLS_COMP} = "BINARY";
$ENV{NLS_LANG} = "AMERICAN";
my ($dsn, $user, $pass) = @ENV{map { "DBICTEST_ORA_${_}" } qw/DSN USER PASS/};
plan skip_all => 'Set $ENV{DBICTEST_ORA_DSN}, _USER and _PASS to run this test.'
unless ($dsn && $user && $pass);
plan skip_all => 'Test needs ' . DBIx::Class::Optional::Dependencies->req_missing_for ('rdbms_oracle')
unless DBIx::Class::Optional::Dependencies->req_ok_for ('rdbms_oracle');
use DBICTest::Schema::Artist;
BEGIN {
DBICTest::Schema::Artist->add_column('parentid');
DBICTest::Schema::Artist->has_many(
children => 'DBICTest::Schema::Artist',
{ 'foreign.parentid' => 'self.artistid' }
);
DBICTest::Schema::Artist->belongs_to(
parent => 'DBICTest::Schema::Artist',
{ 'foreign.artistid' => 'self.parentid' }
);
}
use DBICTest;
use DBICTest::Schema;
my $schema = DBICTest::Schema->connect($dsn, $user, $pass);
note "Oracle Version: " . $schema->storage->_server_info->{dbms_version};
my $dbh = $schema->storage->dbh;
do_creates($dbh);
### test hierarchical queries
{
$schema->resultset('Artist')->create ({
name => 'root',
rank => 1,
cds => [],
children => [
{
name => 'child1',
rank => 2,
children => [
{
name => 'grandchild',
rank => 3,
cds => [
{
title => "grandchilds's cd" ,
year => '2008',
tracks => [
{
position => 1,
title => 'Track 1 grandchild',
}
],
}
],
children => [
{
name => 'greatgrandchild',
rank => 3,
}
],
}
],
},
{
name => 'child2',
rank => 3,
},
],
});
$schema->resultset('Artist')->create({
name => 'cycle-root',
children => [
{
name => 'cycle-child1',
children => [ { name => 'cycle-grandchild' } ],
},
{
name => 'cycle-child2'
},
],
});
$schema->resultset('Artist')->find({ name => 'cycle-root' })
->update({ parentid => { -ident => 'artistid' } });
# select the whole tree
{
my $rs = $schema->resultset('Artist')->search({}, {
start_with => { name => 'root' },
connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
});
is_same_sql_bind (
$rs->as_query,
'(
SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
FROM artist me
START WITH name = ?
CONNECT BY parentid = PRIOR artistid
)',
[ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'root'] ],
);
is_deeply (
[ $rs->get_column ('name')->all ],
[ qw/root child1 grandchild greatgrandchild child2/ ],
'got artist tree',
);
is_same_sql_bind (
$rs->count_rs->as_query,
'(
SELECT COUNT( * )
FROM artist me
START WITH name = ?
CONNECT BY parentid = PRIOR artistid
)',
[ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'root'] ],
);
is( $rs->count, 5, 'Connect By count ok' );
}
# use order siblings by statement
SKIP: {
# http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/state21b.htm#2066123
skip q{Oracle8i doesn't support ORDER SIBLINGS BY}, 1
if $schema->storage->_server_info->{normalized_dbms_version} < 9;
my $rs = $schema->resultset('Artist')->search({}, {
start_with => { name => 'root' },
connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
order_siblings_by => { -desc => 'name' },
});
is_same_sql_bind (
$rs->as_query,
'(
SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
FROM artist me
START WITH name = ?
CONNECT BY parentid = PRIOR artistid
ORDER SIBLINGS BY name DESC
)',
[ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'root'] ],
);
is_deeply (
[ $rs->get_column ('name')->all ],
[ qw/root child2 child1 grandchild greatgrandchild/ ],
'Order Siblings By ok',
);
}
# get the root node
{
my $rs = $schema->resultset('Artist')->search({ parentid => undef }, {
start_with => { name => 'root' },
connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
});
is_same_sql_bind (
$rs->as_query,
'(
SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
FROM artist me
WHERE ( parentid IS NULL )
START WITH name = ?
CONNECT BY parentid = PRIOR artistid
)',
[ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'root'] ],
);
is_deeply(
[ $rs->get_column('name')->all ],
[ 'root' ],
'found root node',
);
}
# combine a connect by with a join
SKIP: {
# http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/state21b.htm#2066123
skip q{Oracle8i doesn't support connect by with join}, 1
if $schema->storage->_server_info->{normalized_dbms_version} < 9;
my $rs = $schema->resultset('Artist')->search(
{'cds.title' => { -like => '%cd'} },
{
join => 'cds',
start_with => { 'me.name' => 'root' },
connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
}
);
is_same_sql_bind (
$rs->as_query,
'(
SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
FROM artist me
LEFT JOIN cd cds ON cds.artist = me.artistid
WHERE ( cds.title LIKE ? )
START WITH me.name = ?
CONNECT BY parentid = PRIOR artistid
)',
[
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'cds.title', 'sqlt_size' => 100 }
=> '%cd'],
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'me.name', 'sqlt_size' => 100 }
=> 'root'],
],
);
is_deeply(
[ $rs->get_column('name')->all ],
[ 'grandchild' ],
'Connect By with a join result name ok'
);
is_same_sql_bind (
$rs->count_rs->as_query,
'(
SELECT COUNT( * )
FROM artist me
LEFT JOIN cd cds ON cds.artist = me.artistid
WHERE ( cds.title LIKE ? )
START WITH me.name = ?
CONNECT BY parentid = PRIOR artistid
)',
[
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'cds.title', 'sqlt_size' => 100 }
=> '%cd'],
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'me.name', 'sqlt_size' => 100 }
=> 'root'],
],
);
is( $rs->count, 1, 'Connect By with a join; count ok' );
}
# combine a connect by with order_by
{
my $rs = $schema->resultset('Artist')->search({}, {
start_with => { name => 'root' },
connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
order_by => { -asc => [ 'LEVEL', 'name' ] },
});
is_same_sql_bind (
$rs->as_query,
'(
SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
FROM artist me
START WITH name = ?
CONNECT BY parentid = PRIOR artistid
ORDER BY LEVEL ASC, name ASC
)',
[
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'root'],
],
);
# Don't use "$rs->get_column ('name')->all" they build a query arround the $rs.
# If $rs has a order by, the order by is in the subquery and this doesn't work with Oracle 8i.
# TODO: write extra test and fix order by handling on Oracle 8i
is_deeply (
[ map { $_->[1] } $rs->cursor->all ],
[ qw/root child1 child2 grandchild greatgrandchild/ ],
'Connect By with a order_by - result name ok (without get_column)'
);
SKIP: {
skip q{Connect By with a order_by - result name ok (with get_column), Oracle8i doesn't support order by in a subquery},1
if $schema->storage->_server_info->{normalized_dbms_version} < 9;
is_deeply (
[ $rs->get_column ('name')->all ],
[ qw/root child1 child2 grandchild greatgrandchild/ ],
'Connect By with a order_by - result name ok (with get_column)'
);
}
}
# limit a connect by
SKIP: {
skip q{Oracle8i doesn't support order by in a subquery}, 1
if $schema->storage->_server_info->{normalized_dbms_version} < 9;
my $rs = $schema->resultset('Artist')->search({}, {
start_with => { name => 'root' },
connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
order_by => [ { -asc => 'name' }, { -desc => 'artistid' } ],
rows => 2,
});
is_same_sql_bind (
$rs->as_query,
'(
SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
FROM (
SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
FROM artist me
START WITH name = ?
CONNECT BY parentid = PRIOR artistid
ORDER BY name ASC, artistid DESC
) me
WHERE ROWNUM <= ?
)',
[
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'root'], [ $ROWS => 2 ],
],
);
is_deeply (
[ $rs->get_column ('name')->all ],
[qw/child1 child2/],
'LIMIT a Connect By query - correct names'
);
is_same_sql_bind (
$rs->count_rs->as_query,
'(
SELECT COUNT( * )
FROM (
SELECT me.artistid
FROM (
SELECT me.artistid
FROM artist me
START WITH name = ?
CONNECT BY parentid = PRIOR artistid
) me
WHERE ROWNUM <= ?
) me
)',
[
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'root'],
[ $ROWS => 2 ],
],
);
is( $rs->count, 2, 'Connect By; LIMIT count ok' );
}
# combine a connect_by with group_by and having
# add some bindvals to make sure things still work
{
my $rs = $schema->resultset('Artist')->search({}, {
select => \[ 'COUNT(rank) + ?', [ __cbind => 3 ] ],
as => 'cnt',
start_with => { name => 'root' },
connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
group_by => \[ 'rank + ? ', [ __gbind => 1] ],
having => \[ 'count(rank) < ?', [ cnt => 2 ] ],
});
is_same_sql_bind (
$rs->as_query,
'(
SELECT COUNT(rank) + ?
FROM artist me
START WITH name = ?
CONNECT BY parentid = PRIOR artistid
GROUP BY( rank + ? ) HAVING count(rank) < ?
)',
[
[ { dbic_colname => '__cbind' }
=> 3 ],
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'root'],
[ { dbic_colname => '__gbind' }
=> 1 ],
[ { dbic_colname => 'cnt' }
=> 2 ],
],
);
is_deeply (
[ $rs->get_column ('cnt')->all ],
[4, 4],
'Group By a Connect By query - correct values'
);
}
# select the whole cycle tree without nocylce
{
my $rs = $schema->resultset('Artist')->search({}, {
start_with => { name => 'cycle-root' },
connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
});
# ORA-01436: CONNECT BY loop in user data
throws_ok { $rs->get_column ('name')->all } qr/ORA-01436/,
"connect by initify loop detection without nocycle";
}
# select the whole cycle tree with nocylce
SKIP: {
# http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/expressi.htm#1023748
skip q{Oracle8i doesn't support connect by nocycle}, 1
if $schema->storage->_server_info->{normalized_dbms_version} < 9;
my $rs = $schema->resultset('Artist')->search({}, {
start_with => { name => 'cycle-root' },
'+select' => \ 'CONNECT_BY_ISCYCLE',
'+as' => [ 'connector' ],
connect_by_nocycle => { parentid => { -prior => { -ident => 'artistid' } } },
});
is_same_sql_bind (
$rs->as_query,
'(
SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid, CONNECT_BY_ISCYCLE
FROM artist me
START WITH name = ?
CONNECT BY NOCYCLE parentid = PRIOR artistid
)',
[
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'cycle-root'],
],
);
is_deeply (
[ $rs->get_column ('name')->all ],
[ qw/cycle-root cycle-child1 cycle-grandchild cycle-child2/ ],
'got artist tree with nocycle (name)',
);
is_deeply (
[ $rs->get_column ('connector')->all ],
[ qw/1 0 0 0/ ],
'got artist tree with nocycle (CONNECT_BY_ISCYCLE)',
);
is_same_sql_bind (
$rs->count_rs->as_query,
'(
SELECT COUNT( * )
FROM artist me
START WITH name = ?
CONNECT BY NOCYCLE parentid = PRIOR artistid
)',
[
[ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
=> 'cycle-root'],
],
);
is( $rs->count, 4, 'Connect By Nocycle count ok' );
}
}
done_testing;
sub do_creates {
my $dbh = shift;
eval {
$dbh->do("DROP SEQUENCE artist_autoinc_seq");
$dbh->do("DROP SEQUENCE artist_pk_seq");
$dbh->do("DROP SEQUENCE cd_seq");
$dbh->do("DROP SEQUENCE track_seq");
$dbh->do("DROP TABLE artist");
$dbh->do("DROP TABLE track");
$dbh->do("DROP TABLE cd");
};
$dbh->do("CREATE SEQUENCE artist_pk_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
$dbh->do("CREATE SEQUENCE cd_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
$dbh->do("CREATE SEQUENCE track_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
$dbh->do("CREATE TABLE artist (artistid NUMBER(12), parentid NUMBER(12), name VARCHAR(255), autoinc_col NUMBER(12), rank NUMBER(38), charfield VARCHAR2(10))");
$dbh->do("ALTER TABLE artist ADD (CONSTRAINT artist_pk PRIMARY KEY (artistid))");
$dbh->do("CREATE TABLE cd (cdid NUMBER(12), artist NUMBER(12), title VARCHAR(255), year VARCHAR(4), genreid NUMBER(12), single_track NUMBER(12))");
$dbh->do("ALTER TABLE cd ADD (CONSTRAINT cd_pk PRIMARY KEY (cdid))");
$dbh->do("CREATE TABLE track (trackid NUMBER(12), cd NUMBER(12) REFERENCES cd(cdid) DEFERRABLE, position NUMBER(12), title VARCHAR(255), last_updated_on DATE, last_updated_at DATE, small_dt DATE)");
$dbh->do("ALTER TABLE track ADD (CONSTRAINT track_pk PRIMARY KEY (trackid))");
$dbh->do(qq{
CREATE OR REPLACE TRIGGER artist_insert_trg_pk
BEFORE INSERT ON artist
FOR EACH ROW
BEGIN
IF :new.artistid IS NULL THEN
SELECT artist_pk_seq.nextval
INTO :new.artistid
FROM DUAL;
END IF;
END;
});
$dbh->do(qq{
CREATE OR REPLACE TRIGGER cd_insert_trg
BEFORE INSERT OR UPDATE ON cd
FOR EACH ROW
DECLARE
tmpVar NUMBER;
BEGIN
tmpVar := 0;
IF :new.cdid IS NULL THEN
SELECT cd_seq.nextval
INTO tmpVar
FROM dual;
:new.cdid := tmpVar;
END IF;
END;
});
$dbh->do(qq{
CREATE OR REPLACE TRIGGER track_insert_trg
BEFORE INSERT ON track
FOR EACH ROW
BEGIN
IF :new.trackid IS NULL THEN
SELECT track_seq.nextval
INTO :new.trackid
FROM DUAL;
END IF;
END;
});
}
# clean up our mess
END {
if ($schema and my $dbh = $schema->storage->dbh) {
eval { $dbh->do($_) } for (
'DROP SEQUENCE artist_pk_seq',
'DROP SEQUENCE cd_seq',
'DROP SEQUENCE track_seq',
'DROP TABLE artist',
'DROP TABLE track',
'DROP TABLE cd',
);
};
undef $schema;
}