package UR::Context::LoadingIterator;
use strict;
use warnings;
use UR::Context;
use List::MoreUtils qw(any);
our $VERSION = "0.46"; # UR $VERSION;
# A helper package for UR::Context to handling queries which require loading
# data from outside the current context. It is responsible for collating
# cached objects and incoming objects. When create_iterator() is used in
# application code, this is the iterator that gets returned
#
# These are normal Perl objects, not UR objects, so they get regular
# refcounting and scoping
our @CARP_NOT = qw( UR::Context );
# A boolean flag used in the loading iterator to control whether we need to
# inject loaded objects into other loading iterators' cached lists
my $is_multiple_loading_iterators = 0;
my %all_loading_iterators;
# The set of objects returned by an iterator is initially determined when the
# iterator is created, but the final determination of membership happens when
# the object is about to be returned from the iterator's next() method.
# In practice, this means that an object matches the BoolExpr at iterator
# creation, and no longer matches when that object is about to be returned,
# it will not be returned.
#
# If an object does not match the bx when the iterator is created, it will
# not be returned even if it later changes to match before the iterator is
# exhausted.
#
# If an object changes so that it's sort order changes after the iterator is
# created but before it is returned by the iterator, the object will be
# returned in the order it had at iterator creation time.
# Finally, the LoadingIterator will throw an exception if an object matches
# the BoolExpr at iterator creation time, but is deleted when next() is about
# to return it (ie. isa UR::DeletedRef). Since DeletedRef's die any time you
# try to use them, the object sorters can't sort them. Instead, we'll just
# punt and throw an exception ourselves if we come across one.
#
# This seems like the least suprising thing to do, but there are other solutions:
# 1) just plain don't return the deleted object
# 2) use signal_change to register a callback which will remove objects being deleted
# from all the in-process iterator @$cached lists (accomplishes the same as #1).
# For completeness, this may imply that other signal_change callbacks would remove
# objects that no longer match rules for in-process iterators, and that means that
# next() returns things true at the time next() is called, not when the iterator
# is created.
# 3) Put in some additional infrastructure so we can pull out the ID of a deleted
# object. That lets us call $next_object->id at the end of the closure, and return these
# deleted objects back to the user. Problem being that the user then can't really
# do anything with them. But it would be consistent about returning _all_ objects
# that matched the rule at iterator creation time
# 4) Like #3, but just always return the deleted object before any underlying_context
# object, and then don't try to get its ID at the end if the iterator if it's deleted
sub _create {
my($class, $cached, $context, $normalized_rule, $data_source, $this_get_serial ) = @_;
my $limit = $normalized_rule->template->limit;
my $offset = $normalized_rule->template->offset;
my $db_results_should_be_complete = 1;
if (($offset or defined($limit))
and ( ! $data_source->does_support_limit_offset($normalized_rule)
or any { $_->__changes__ } @$cached
)
) {
# If there are any cached objects, then the asked-for offset may not necessarily
# be the offset that applies to data in the database. And since the offset is not
# meaningful, neither is the limit. Consider a query matching these objects with
# limit => 2, offset => 1
# In memory: 1 2
# In DB : 3 4 5 6
# The result should be (2, 3). If we kept the -offset in the DB's SQL, we would
# have missed object 3.
# Similarly, if any DB rows exist as objects with changed data, then rows returnd
# from the DB might not be included in the results, and the supplied -limit could
# keep us from reading rows that should be returned
my %filters = $normalized_rule->params_list;
delete @filters{'-limit', '-offset'};
$normalized_rule = UR::BoolExpr->resolve_normalized($normalized_rule->subject_class_name, %filters);
$db_results_should_be_complete = 0;
} elsif ($offset) {
# Also apply the offset to the list of cached objects.
if ($offset > @$cached) {
@$cached = ();
} else {
splice(@$cached, 0, $offset);
}
undef($offset); # Now don't have to deal with offset below in the iterator
}
my $underlying_context_iterator = $context->_create_import_iterator_for_underlying_context(
$normalized_rule, $data_source, $this_get_serial);
my $is_monitor_query = $context->monitor_query;
# These are captured by the closure...
my($last_loaded_id, $next_obj_current_context, $next_obj_underlying_context);
my $object_sorter = $normalized_rule->template->sorter();
my $bx_subject_class = $normalized_rule->subject_class_name;
# Collection of object IDs that were read from the DB query. These objects are for-sure
# not deleted, even though a cached object for it might have been turned into a ghost or
# had its properties changed
my %db_seen_ids_that_are_not_deleted;
# Collection of object IDs that were read from the cached object list and haven't been
# seen in the lsit of results from the database (yet). It could be missing from the DB
# results because that row has been deleted, because the DB row still exists but has been
# changed since we loaded it and now doesn't match the BoolExp, or because we're sorting
# results by something other than just ID, that sorted property has been changed in the DB
# and we haven't come across this row yet but will before.
#
# The short story is that if there is anything in this hash when the underlying context iterator
# is exhausted, then the ID-ed object is really deleted, and should be an exception
my %changed_objects_that_might_be_db_deleted;
my $underlying_context_objects_count = 0;
my $cached_objects_count = 0;
# knowing if an object's changed properties are one of the rule's order-by
# properties helps later on in the loading process of detecting deleted DB rows
my %order_by_properties;
if ($normalized_rule->template->order_by) {
%order_by_properties = map { $_ => 1 } @{ $normalized_rule->template->order_by };
}
my $change_is_order_by_property = sub {
foreach my $prop_name ( shift->_changed_property_names ) {
return 1 if exists($order_by_properties{$prop_name});
}
return;
};
my %bx_filter_properties = map { $_ => 1 } $normalized_rule->template->_property_names;
my $change_is_bx_filter_property = sub {
foreach my $prop_name ( shift->_changed_property_names ) {
return 1 if exists($bx_filter_properties{$prop_name});
}
return;
};
my $me_loading_iterator_as_string; # See note below the closure definition
my $loading_iterator = sub {
return if (defined($limit) and !$limit);
my $next_object;
PICK_NEXT_OBJECT_FOR_LOADING:
while (! defined($next_object)) {
if ($underlying_context_iterator && ! defined($next_obj_underlying_context)) {
($next_obj_underlying_context) = $underlying_context_iterator->(1);
$underlying_context_objects_count++ if ($is_monitor_query and defined($next_obj_underlying_context));
if (defined($next_obj_underlying_context)) {
if ($next_obj_underlying_context->isa('UR::DeletedRef')) {
# This object is deleted in the current context and not yet committed
# skip it and pick again
$next_obj_underlying_context = undef;
redo PICK_NEXT_OBJECT_FOR_LOADING;
} elsif ($next_obj_underlying_context->__changes__
and
$change_is_order_by_property->($next_obj_underlying_context)
) {
unless (delete $changed_objects_that_might_be_db_deleted{$next_obj_underlying_context->id}) {
$db_seen_ids_that_are_not_deleted{$next_obj_underlying_context->id} = 1;
}
$next_obj_underlying_context = undef;
redo PICK_NEXT_OBJECT_FOR_LOADING;
}
}
}
unless (defined $next_obj_current_context) {
($next_obj_current_context) = shift @$cached;
$cached_objects_count++ if ($is_monitor_query and $next_obj_current_context);
}
if (defined($next_obj_current_context) and $next_obj_current_context->isa('UR::DeletedRef')) {
my $obj_to_complain_about = $next_obj_current_context;
# undef it in case the user traps the exception, next time we'll pull another off the list
$next_obj_current_context = undef;
Carp::croak("Attempt to fetch an object which matched $normalized_rule when the iterator was created, "
. "but was deleted in the meantime:\n"
. Data::Dumper::Dumper($obj_to_complain_about) );
}
if (!defined($next_obj_underlying_context)) {
if ($is_monitor_query) {
$context->_log_query_for_rule($bx_subject_class,
$normalized_rule,
"QUERY: loaded $underlying_context_objects_count object(s) total from underlying context.");
}
$underlying_context_iterator = undef;
# Anything left in this hash when the DB iterator is exhausted are object we expected to
# see by now and must be deleted. If any of these object have changes then
# the __merge below will throw an exception
if ($db_results_should_be_complete) {
foreach my $problem_obj (values(%changed_objects_that_might_be_db_deleted)) {
$context->__merge_db_data_with_existing_object($bx_subject_class, $problem_obj, undef, []);
}
}
}
elsif (defined($last_loaded_id)
and
$last_loaded_id eq $next_obj_underlying_context->id)
{
# during a get() with -hints or is_many+is_optional (ie. something with an
# outer join), it's possible that the join can produce the same main object
# as it's chewing through the (possibly) multiple objects joined to it.
# Since the objects will be returned sorted by their IDs, we only have to
# remember the last one we saw
# FIXME - is this still true now that the underlying context iterator and/or
# object fabricator hold off on returning any objects until all the related
# joined data bas been loaded?
$next_obj_underlying_context = undef;
redo PICK_NEXT_OBJECT_FOR_LOADING;
}
# decide which pending object to return next
# both the cached list and the list from the database are sorted separately but with
# equivalent algorithms (we hope).
#
# we're collating these into one return stream here
my $comparison_result = undef;
if (defined($next_obj_underlying_context) && defined($next_obj_current_context)) {
$comparison_result = $object_sorter->($next_obj_underlying_context, $next_obj_current_context);
}
my $next_obj_underlying_context_id;
$next_obj_underlying_context_id = $next_obj_underlying_context->id if (defined $next_obj_underlying_context);
my $next_obj_current_context_id;
$next_obj_current_context_id = $next_obj_current_context->id if (defined $next_obj_current_context);
# This if() section is for when the in-memory and DB iterators return the same
# object at the same time.
if (
defined($next_obj_underlying_context)
and defined($next_obj_current_context)
and $comparison_result == 0 # $next_obj_underlying_context->id eq $next_obj_current_context->id
) {
# Both objects sort the same. Since the ID properties are always last in the sort order list,
# this means both objects must be the same object.
$context->_log_query_for_rule($bx_subject_class, $normalized_rule, "QUERY: loaded object was already cached") if ($is_monitor_query);
$next_object = $next_obj_current_context;
$next_obj_current_context = undef;
$next_obj_underlying_context = undef;
}
# This if() section is for when the DB iterator's object sorts first
elsif (
defined($next_obj_underlying_context)
and (
(!defined($next_obj_current_context))
or
($comparison_result < 0) # ($next_obj_underlying_context->id le $next_obj_current_context->id)
)
) {
# db object sorts first
# If we deleted it from memory the DB would not have given it back.
# So it either failed to match the BX now, or one of the order-by parameters changed
if ($next_obj_underlying_context->__changes__) {
# See if one of the changes is an order-by property
if ($change_is_order_by_property->($next_obj_underlying_context)) {
# If the object has changes, and one of the changes is one of the
# order-by properties, then the object will:
# 1) Already have appeared as $next_obj_current_context.
# it will be in $changed_objects_that_might_be_db_deleted - remove it from that list
# 2) Will appear later as $next_obj_current_context.
# Mark here that it's not deleted
unless (delete $changed_objects_that_might_be_db_deleted{$next_obj_underlying_context_id}) {
$db_seen_ids_that_are_not_deleted{$next_obj_underlying_context_id} = 1;
}
} elsif ($change_is_bx_filter_property->($next_obj_underlying_context)) {
# If the object has any changes, then it will appear in the cached object list in
# $next_object_current_context at the appropriate time. For the case where the
# object no longer matches the BoolExpr, then the appropriate time is never.
# Discard this object from the DB and pick again
$next_obj_underlying_context = undef;
redo PICK_NEXT_OBJECT_FOR_LOADING;
} else {
# some other kind of change?
$next_object = $next_obj_underlying_context;
$next_obj_underlying_context = undef;
next PICK_NEXT_OBJECT_FOR_LOADING;
}
} else {
# If the object has no changes, it must be something newly brought into the system.
$next_object = $next_obj_underlying_context;
$next_obj_underlying_context = undef;
next PICK_NEXT_OBJECT_FOR_LOADING;
}
}
# This if() section is for when the in-memory iterator's object sorts first
elsif (
defined($next_obj_current_context)
and (
(!defined($next_obj_underlying_context))
or
($comparison_result > 0) # ($next_obj_underlying_context->id ge $next_obj_current_context->id)
)
) {
# The cached object sorts first
# Either it was changed in memory, in the DB or both
# In addition, the change could have been to an order-by property, one of the
# properties in the BoolExpr, or both
if (! $next_obj_current_context->isa('UR::Object::Set') # Sets aren't really from the underlying context
and
$context->object_exists_in_underlying_context($next_obj_current_context)
) {
if ($next_obj_current_context->__changes__) {
if ($change_is_order_by_property->($next_obj_current_context)) {
# This object is expected to exist in the underlying context, has changes, and at
# least one of those changes is to an order-by property
#
# if it's in %db_seen_ids_that_are_not_deleted, then it was seen earlier
# from the DB, and can now be removed from that hash.
unless (delete $db_seen_ids_that_are_not_deleted{$next_obj_current_context_id}) {
# If not in that list, then add it to the list of things we might see later
# in the DB iterator. If we don't see it by the end if the iterator, it
# must have been deleted from the DB. At that time, we'll throw an exception.
# It's later than we'd like, since the caller has already gotten ahold of the
# object, but better late than never. The alternative is to do an id-only
# query right now, but that would be inefficient.
#
# We could avoid storing this if we could verify that the db_committed/db_saved_uncommitted
# values did NOT match the BoolExpr, but this will suffice for now.
$changed_objects_that_might_be_db_deleted{$next_obj_current_context_id} = $next_obj_current_context;
}
# In any case, return the cached object.
$next_object = $next_obj_current_context;
$next_obj_current_context = undef;
next PICK_NEXT_OBJECT_FOR_LOADING;
}
elsif ($change_is_bx_filter_property->($next_obj_current_context)) {
# The change was that the object originally did not the filter, but since being
# loaded it's been changed so it now matches the filter. The DB iterator isn't
# returning the object since the DB's copy doesn't match the filter.
delete $db_seen_ids_that_are_not_deleted{$next_obj_current_context_id};
$next_object = $next_obj_current_context;
$next_obj_current_context = undef;
next PICK_NEXT_OBJECT_FOR_LOADING;
}
else {
# The change is not an order-by property. This object must have been deleted
# from the DB. The call to __merge below will throw an exception
$context->__merge_db_data_with_existing_object($bx_subject_class, $next_obj_current_context, undef, []);
$next_obj_current_context = undef;
redo PICK_NEXT_OBJECT_FOR_LOADING;
}
} else {
# This cached object has no changes, so the database must have changed.
# It could be deleted, no longer match the BoolExpr, or have changes in an order-by property
if (delete $db_seen_ids_that_are_not_deleted{$next_obj_current_context_id}) {
# We saw this already on the DB iterator. It's not deleted. Go ahead and return it
$next_object = $next_obj_current_context;
$next_obj_current_context = undef;
next PICK_NEXT_OBJECT_FOR_LOADING;
}
elsif ($normalized_rule->is_id_only) {
# If the query is id-only, and we didn't see the DB object at the same time, then
# the DB row must have been deleted. Changing the PK columns in the DB are logically
# the same as deleting the old object and creating/defineing a new one in UR.
#
# The __merge will delete the cached object, then pick again
$context->__merge_db_data_with_existing_object($bx_subject_class, $next_obj_current_context, undef, []);
$next_obj_current_context = undef;
redo PICK_NEXT_OBJECT_FOR_LOADING;
} else {
# Force an ID-only query to the underying context
my $requery_obj = $context->reload($bx_subject_class, id => $next_obj_current_context_id);
if ($requery_obj) {
# In any case, the DB iterator will pull it up at the appropriate time,
# and since the object has no changes, it will be returned to the caller then.
# Discard this in-memory object and pick again
$next_obj_current_context = undef;
redo PICK_NEXT_OBJECT_FOR_LOADING;
} else {
# We've now confirmed that the object in the DB is really gone
# NOTE: the reload() has already performed the __merge (implying deletion)
# in the above branch "elsif ($normalized_rule->is_id_only)" so we don't need
# to __merge/delete it here
$next_obj_current_context = undef;
redo PICK_NEXT_OBJECT_FOR_LOADING;
}
}
}
} else {
# The object does not exist in the underlying context. It must be
# a newly created object.
$next_object = $next_obj_current_context;
$next_obj_current_context = undef;
next PICK_NEXT_OBJECT_FOR_LOADING;
}
} elsif (!defined($next_obj_current_context)
and
!defined($next_obj_underlying_context)
) {
# Both iterators are exhausted. Bail out
$next_object = undef;
$last_loaded_id = undef;
last PICK_NEXT_OBJECT_FOR_LOADING;
} else {
# Couldn't decide which to pick next? Something has gone horribly wrong.
# We're using other vars to hold the objects and setting
# $next_obj_current_context/$next_obj_underlying_context to undef so if
# the caller is trapping exceptions, this iterator will pick new objects next time
my $current_problem_obj = $next_obj_current_context;
my $underlying_problem_obj = $next_obj_underlying_context;
$next_obj_current_context = undef;
$next_obj_underlying_context = undef;
$next_object = undef;
Carp::croak("Loading iterator internal error. Could not pick a next object for loading.\n"
. "Next object underlying context: " . Data::Dumper::Dumper($underlying_problem_obj)
. "\nNext object current context: ". Data::Dumper::Dumper($current_problem_obj));
}
return unless defined $next_object;
# end while ! $next_object
} continue {
if (defined($next_object) and defined($offset) and $offset) {
$offset--;
$next_object = undef;
}
}
$last_loaded_id = $next_object->id if (defined $next_object);
$limit-- if defined $limit;
return $next_object;
}; # end of the closure
bless $loading_iterator, $class;
Sub::Name::subname($class . '__loading_iterator_closure__', $loading_iterator);
# Inside the closure, it needs to know its own address, but without holding a real reference
# to itself - otherwise the closure would never go out of scope, the destructor would never
# get called, and the list of outstanding loaders would never get pruned. This way, the closure
# holds a reference to the string version of its address, which is the only thing it really
# needed anyway
$me_loading_iterator_as_string = $loading_iterator . '';
$all_loading_iterators{$me_loading_iterator_as_string} =
[ $me_loading_iterator_as_string,
$normalized_rule,
$object_sorter,
$cached,
\$underlying_context_objects_count,
\$cached_objects_count,
$context,
];
$is_multiple_loading_iterators = 1 if (keys(%all_loading_iterators) > 1);
return $loading_iterator;
} # end _create()
sub DESTROY {
my $self = shift;
my $iter_data = $all_loading_iterators{$self};
if ($iter_data->[0] eq $self) {
# that's me!
# Items in the listref are: $loading_iterator_string, $rule, $object_sorter, $cached,
# \$underlying_context_objects_count, \$cached_objects_count, $context
my $context = $iter_data->[6];
if ($context and $context->monitor_query) {
my $rule = $iter_data->[1];
my $count = ${$iter_data->[4]} + ${$iter_data->[5]};
$context->_log_query_for_rule($rule->subject_class_name, $rule, "QUERY: Query complete after returning $count object(s) for rule $rule.");
$context->_log_done_elapsed_time_for_rule($rule);
}
delete $all_loading_iterators{$self};
$is_multiple_loading_iterators = 0 if (keys(%all_loading_iterators) < 2);
} else {
Carp::carp('A loading iterator went out of scope, but could not be found in the registered list of iterators');
}
}
# Used by the loading itertor to inject a newly loaded object into another
# loading iterator's @$cached list. This is to handle the case where the user creates
# an iterator which will load objects from the DB. Before all the data from that
# iterator is read, another get() or iterator is created that covers (some of) the same
# objects which get pulled into the object cache, and the second request is run to
# completion. Since the underlying context iterator has been changed to never return
# objects currently cached, the first iterator would have incorrectly skipped ome objects that
# were not loaded when the first iterator was created, but later got loaded by the second.
sub _inject_object_into_other_loading_iterators {
my($self, $new_object, $iterator_to_skip) = @_;
ITERATOR:
foreach my $iter_name ( keys %all_loading_iterators ) {
next if $iter_name eq $iterator_to_skip; # That's me! Don't insert into our own @$cached this way
my($loading_iterator, $rule, $object_sorter, $cached)
= @{$all_loading_iterators{$iter_name}};
if ($rule->evaluate($new_object)) {
my $cached_list_len = @$cached;
for(my $i = 0; $i < $cached_list_len; $i++) {
my $cached_object = $cached->[$i];
next if $cached_object->isa('UR::DeletedRef');
my $comparison = $object_sorter->($new_object, $cached_object);
if ($comparison < 0) {
# The new object sorts sooner than this one. Insert it into the list
splice(@$cached, $i, 0, $new_object);
next ITERATOR;
} elsif ($comparison == 0) {
# This object is already in the list
next ITERATOR;
}
}
# It must go at the end...
push @$cached, $new_object;
}
} # end foreach
}
# Reverse of _inject_object_into_other_loading_iterators(). Used when one iterator detects that
# a previously loaded object no longer exists in the underlying context/datasource
sub _remove_object_from_other_loading_iterators {
my($self, $disappearing_object, $iterator_to_skip) = @_;
ITERATOR:
foreach my $iter_name ( keys %all_loading_iterators ) {
next if(! defined $iterator_to_skip or ($iter_name eq $iterator_to_skip)); # That's me! Don't remove into our own @$cached this way
my($loading_iterator, $rule, $object_sorter, $cached)
= @{$all_loading_iterators{$iter_name}};
next if (defined($iterator_to_skip)
and $loading_iterator eq $iterator_to_skip); # That's me! Don't insert into our own @$cached this way
if ($rule->evaluate($disappearing_object)) {
my $cached_list_len = @$cached;
for(my $i = 0; $i < $cached_list_len; $i++) {
my $cached_object = $cached->[$i];
next if $cached_object->isa('UR::DeletedRef');
my $comparison = $object_sorter->($disappearing_object, $cached_object);
if ($comparison == 0) {
# That's the one, remove it from the list
splice(@$cached, $i, 1);
next ITERATOR;
} elsif ($comparison < 0) {
# past the point where we expect to find this object
next ITERATOR;
}
}
}
} # end foreach
}
# Returns true if any of the object's changed properites are keys
# in the passed-in hashref. Used by the Loading Iterator to find out if
# a change is one of the order-by properties of a bx
sub _changed_property_in_hash {
my($self,$object,$hash) = @_;
foreach my $prop_name ( $object->_changed_property_names ) {
return 1 if (exists $hash->{$prop_name});
}
return;
}
1;