Commit 83de77ea authored by Jorn Bruggeman's avatar Jorn Bruggeman
Browse files

write task order to log instead of stdout; fixed pyfabm start and improved error handling

parent 9a0b2909
......@@ -2,7 +2,10 @@
from __future__ import print_function
from __future__ import unicode_literals
import sys,os,ctypes,re
import sys
import os
import ctypes
import re
try:
import numpy
......@@ -11,8 +14,8 @@ except ImportError:
sys.exit(1)
# Determine potential names of FABM dynamic library.
if os.name=='nt':
dllpaths = ('python_fabm.dll','libpython_fabm.dll')
if os.name == 'nt':
dllpaths = ('python_fabm.dll', 'libpython_fabm.dll')
elif os.name == "posix" and sys.platform == "darwin":
dllpaths = ('libpython_fabm.dylib',)
else:
......@@ -126,7 +129,7 @@ fabm.get_interior_diagnostic_data.restype = None
fabm.get_horizontal_diagnostic_data.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.POINTER(ctypes.POINTER(ctypes.c_double))]
fabm.get_horizontal_diagnostic_data.restype = None
fabm.start.argtypes = []
fabm.start.argtypes = [ctypes.c_void_p]
fabm.start.restype = None
# Routine for retrieving source-sink terms for the interior domain.
......@@ -193,12 +196,12 @@ def printTree(root,stringmapper,indent=''):
"""Print an indented tree of objects, encoded by dictionaries linking the names of children to
their subtree, or to their object. Objects are finally printed as string obtained by
calling the provided stringmapper method."""
for name,item in root.items():
if isinstance(item,dict):
print('%s%s' % (indent,name))
printTree(item,stringmapper,indent+' ')
for name, item in root.items():
if isinstance(item, dict):
print('%s%s' % (indent, name))
printTree(item, stringmapper, indent + ' ')
else:
print('%s%s = %s' % (indent,name,stringmapper(item)))
print('%s%s = %s' % (indent, name, stringmapper(item)))
class Variable(object):
def __init__(self, name=None, units=None, long_name=None, path=None, variable_pointer=None):
......@@ -207,7 +210,7 @@ class Variable(object):
strname = ctypes.create_string_buffer(ATTRIBUTE_LENGTH)
strunits = ctypes.create_string_buffer(ATTRIBUTE_LENGTH)
strlong_name = ctypes.create_string_buffer(ATTRIBUTE_LENGTH)
fabm.variable_get_metadata(variable_pointer,ATTRIBUTE_LENGTH,strname,strunits,strlong_name)
fabm.variable_get_metadata(variable_pointer, ATTRIBUTE_LENGTH, strname, strunits, strlong_name)
name = strname.value.decode('ascii')
units = strunits.value.decode('ascii')
long_name = strlong_name.value.decode('ascii')
......@@ -499,7 +502,7 @@ class Model(object):
given the current state and environment.
"""
if t is None:
t = self.ntime
t = self.itime
localrates = numpy.empty_like(self.state)
fabm.get_rates(self.pmodel, t, localrates, surface, bottom)
return localrates
......@@ -574,14 +577,16 @@ class Model(object):
parent[pathcomps[-1]] = parameter
return root
def checkReady(self, verbose=True, stop=False):
def start(self, verbose=True, stop=False):
ready = True
for dependency in self.dependencies:
if not dependency.is_set:
print('Value for dependency %s is not set.' % dependency.name)
ready = False
assert ready or not stop, 'Not all dependencies have been fulfilled.'
fabm.start()
fabm.start(self.pmodel)
if hasError():
return False
for i, variable in enumerate(self.interior_diagnostic_variables):
pdata = ctypes.POINTER(ctypes.c_double)()
fabm.get_interior_diagnostic_data(self.pmodel, i + 1, ctypes.byref(pdata))
......@@ -591,6 +596,7 @@ class Model(object):
fabm.get_horizontal_diagnostic_data(self.pmodel, i + 1, ctypes.byref(pdata))
variable.data = None if not pdata else pdata.contents
return ready
checkReady = start
def updateTime(self, nsec):
self.itime = nsec
......
......@@ -6,7 +6,7 @@ module fabm_python
!DIR$ ATTRIBUTES DLLEXPORT :: STATE_VARIABLE,DIAGNOSTIC_VARIABLE,CONSERVED_QUANTITY
use fabm, only: type_fabm_model, type_external_variable, fabm_get_version
use fabm, only: type_fabm_model, type_external_variable, fabm_get_version, status_check_ready_done
use fabm_config
use fabm_types, only:rk => rke,attribute_length,type_model_list_node,type_base_model, &
factory,type_link,type_link_list,type_internal_variable
......@@ -439,6 +439,11 @@ contains
real(c_double), pointer :: dy_(:)
call c_f_pointer(pmodel, model)
if (model%p%status < status_check_ready_done) then
call fatal_error('get_rates', 'start has not been called yet.')
return
end if
call c_f_pointer(c_loc(dy), dy_, &
(/size(model%p%state_variables) + size(model%p%surface_state_variables) + size(model%p%bottom_state_variables)/))
......@@ -479,6 +484,11 @@ contains
logical :: repair, interior_valid, surface_valid, bottom_valid
call c_f_pointer(pmodel, model)
if (model%p%status < status_check_ready_done) then
call fatal_error('check_state', 'start has not been called yet.')
return
end if
repair = int2logical(repair_)
call model%p%check_interior_state(repair, interior_valid)
call model%p%check_surface_state(repair, surface_valid)
......@@ -503,8 +513,14 @@ contains
logical :: surface, bottom
call c_f_pointer(pmodel, model)
if (ny /= size(model%p%state_variables) + size(model%p%surface_state_variables) + size(model%p%bottom_state_variables)) &
call fatal_error('integrate', 'ny is wrong length')
if (model%p%status < status_check_ready_done) then
call fatal_error('integrate', 'start has not been called yet.')
return
end if
if (ny /= size(model%p%state_variables) + size(model%p%surface_state_variables) + size(model%p%bottom_state_variables)) then
call fatal_error('integrate', 'ny is wrong length')
return
end if
call c_f_pointer(c_loc(t_), t, (/nt/))
call c_f_pointer(c_loc(y_ini_), y_ini, (/ny/))
......@@ -512,10 +528,11 @@ contains
surface = int2logical(do_surface)
bottom = int2logical(do_bottom)
if (surface .or. bottom) then
if (.not. associated(model%column_depth)) call fatal_error('get_rates', &
if ((surface .or. bottom) .and. .not. associated(model%column_depth)) then
call fatal_error('get_rates', &
'Value for environmental dependency ' // trim(model%environment_names(model%index_column_depth)) // &
' must be provided if integrate is called with the do_surface and/or do_bottom flags.')
return
end if
call model%p%link_all_interior_state_data(y_cur(1:size(model%p%state_variables)))
call model%p%link_all_surface_state_data(y_cur(size(model%p%state_variables) + 1: &
......
......@@ -78,7 +78,7 @@ module fabm
integer, parameter :: status_none = 0
integer, parameter :: status_initialize_done = 1
integer, parameter :: status_set_domain_done = 2
integer, parameter :: status_check_ready_done = 3
integer, parameter, public :: status_check_ready_done = 3
integer, parameter, public :: data_source_none = 0
integer, parameter, public :: data_source_host = 1
......@@ -913,8 +913,13 @@ contains
character(len=*), parameter :: log_prefix = 'fabm_'
integer :: log_unit, ios
if (self%status < status_set_domain_done) &
if (self%status < status_set_domain_done) then
call fatal_error('start', 'set_domain has not yet been called on this model object.')
return
elseif (self%status >= status_check_ready_done) then
! start has been called on this model before and it must have succeeded to have this status. We are done.
return
end if
ready = .true.
......@@ -974,11 +979,13 @@ contains
call self%get_diagnostics_job%request_variable(self%horizontal_diagnostic_variables(ivar)%target, store=.true.)
end do
log_unit = -1
if (fabm_log) log_unit = get_free_unit()
! Merge write indices when operations can be done in place
! This must be done after all variables are requested from the different jobs, so we know which variables
! will be retrieved (such variables cannot be merged)
if (fabm_log) then
log_unit = get_free_unit()
open(unit=log_unit, file=log_prefix // 'merges.log', action='write', status='replace', iostat=ios)
if (ios /= 0) call fatal_error('start', 'Unable to open ' // log_prefix // 'merges.log')
call merge_indices(self%root, log_unit)
......@@ -988,7 +995,12 @@ contains
end if
! Initialize all jobs. This also creates registers for the read and write caches, as well as the persistent store.
call self%job_manager%initialize(self%variable_register, self%schedules, unfulfilled_dependencies)
if (fabm_log) then
open(unit=log_unit, file=log_prefix // 'task_order.log', action='write', status='replace', iostat=ios)
if (ios /= 0) call fatal_error('start', 'Unable to open ' // log_prefix // 'task_order.log')
end if
call self%job_manager%initialize(self%variable_register, self%schedules, unfulfilled_dependencies, log_unit)
if (fabm_log) close(log_unit)
! Create persistent store. This provides memory for all variables to be stored there.
call create_store(self)
......@@ -999,7 +1011,7 @@ contains
call collect_internal_fill_values(self%variable_register%write_cache%interior, self%write_cache_fill_value, use_missing=.false.)
call collect_internal_fill_values(self%variable_register%write_cache%horizontal, self%write_cache_hz_fill_value, use_missing=.false.)
! Create global caches for exchanging information wiht BGC models.
! Create global caches for exchanging information with BGC models.
! This can only be done after collect_fill_values calls complete, because they specify what values to prefill te cache with.
call create_interior_cache(self, self%cache_int)
call create_horizontal_cache(self, self%cache_hz)
......@@ -2866,9 +2878,10 @@ end subroutine end_vertical_task
# endif
#endif
drag = 1.0_rke
call begin_horizontal_task(self,self%get_drag_job%first_task,self%cache_hz _POSTARG_HORIZONTAL_IN_)
drag = 1.0_rke
call_node => self%get_drag_job%first_task%first_call
do while (associated(call_node))
if (call_node%source == source_get_drag) call call_node%model%get_drag(self%cache_hz, drag)
......@@ -2893,9 +2906,10 @@ end subroutine end_vertical_task
# endif
#endif
albedo = 0.0_rke
call begin_horizontal_task(self,self%get_albedo_job%first_task, self%cache_hz _POSTARG_HORIZONTAL_IN_)
albedo = 0.0_rke
call_node => self%get_albedo_job%first_task%first_call
do while (associated(call_node))
if (call_node%source == source_get_albedo) call call_node%model%get_albedo(self%cache_hz, albedo)
......
......@@ -26,6 +26,7 @@ module fabm_job
type type_graph_subset_node_set
type (type_graph_subset_node_pointer), pointer :: first => null()
integer :: log_unit = -1
contains
procedure :: collect => graph_subset_node_set_collect
procedure :: collect_and_branch => graph_subset_node_set_collect_and_branch
......@@ -171,16 +172,19 @@ module fabm_job
procedure :: print => variable_register_print
end type
contains
contains
subroutine create_graph_subset_node_set(graph, set)
subroutine create_graph_subset_node_set(graph, set, log_unit)
type (type_graph), intent(in) :: graph
type (type_graph_subset_node_set), intent(out) :: set
integer, intent(in) :: log_unit
type (type_node_list_member), pointer :: graph_node
type (type_node_set_member), pointer :: dependency
type (type_graph_subset_node_pointer),pointer :: set_node, set_node2, set_node3
set%log_unit = log_unit
! Create representatives for each original graph node.
graph_node => graph%first
do while (associated(graph_node))
......@@ -304,7 +308,7 @@ module fabm_job
call self%collect(tree_node%operation, removed)
if (.not.associated(self%first)) write (*,*) ' - ' // trim(tree_node%to_string())
if (self%log_unit /= -1 .and. .not. associated(self%first)) write (self%log_unit,'(a,a)') ' - ', trim(tree_node%to_string())
! We have processed all graph end points with compatible sources.
! Now process end points with other sources.
......@@ -1013,8 +1017,9 @@ subroutine job_create_graph(self, variable_register)
self%state = job_state_graph_created
end subroutine job_create_graph
subroutine job_create_tasks(self)
class (type_job),target,intent(inout) :: self
subroutine job_create_tasks(self, log_unit)
class (type_job), target, intent(inout) :: self
integer, intent(in) :: log_unit
type (type_job_node), pointer :: job_node
type (type_graph_subset_node_set) :: subset
......@@ -1027,7 +1032,7 @@ subroutine job_create_tasks(self)
_ASSERT_(self%state < job_state_tasks_created, 'job_create_tasks', trim(self%name)//': tasks for this job have already been created.')
_ASSERT_(self%state >= job_state_graph_created, 'job_create_tasks', trim(self%name)//': the graph for this job have not been created yet.')
write (*,'(a)') trim(self%name)
if (log_unit /= -1) write (log_unit,'(a)') trim(self%name)
! If we are linked to an earlier called job, make sure its task list has already been created.
! This is essential if we will try to outsource our own calls to previous tasks/jobs.
......@@ -1038,8 +1043,8 @@ subroutine job_create_tasks(self)
end do
! Create tree that describes all possible task orders (root is at the right of the call list, i.e., at the end of the very last call)
write (*,'(a)') '- possible task orders:'
call create_graph_subset_node_set(self%graph, subset)
if (log_unit /= -1) write (log_unit,'(a)') ' possible task orders:'
call create_graph_subset_node_set(self%graph, subset, log_unit)
root%operation = self%operation
if (root%operation /= source_unknown) then
call subset%collect_and_branch(root)
......@@ -1048,13 +1053,16 @@ subroutine job_create_tasks(self)
end if
! Determine optimum task order and use it to create the task objects.
ntasks = root%get_minimum_depth() - 1
write (*,'(a,i0,a)') '- best task order contains ',ntasks,' tasks.'
leaf => root%get_leaf_at_depth(ntasks + 1)
write (*,'(a,a)') '- best task order: ',trim(leaf%to_string())
ntasks = root%get_minimum_depth()
leaf => root%get_leaf_at_depth(ntasks)
call create_tasks(leaf)
_ASSERT_(.not. associated(subset%first), 'job_select_order', 'BUG: graph subset should be empty after create_tasks.')
if (log_unit /= -1) then
if (root%operation == source_unknown) ntasks = ntasks - 1
write (log_unit,'(a,i0,a,a)') ' best task order (', ntasks, ' tasks): ', trim(leaf%to_string())
end if
if (self%outsource_tasks) then
task => self%first_task
do while (associated(task))
......@@ -1068,7 +1076,9 @@ subroutine job_create_tasks(self)
end do
end if
_ASSERT_(self%operation == source_unknown .or. .not. associated(self%first_task%next), 'job_select_order', 'Multiple tasks created while only one source was acceptable.')
if (associated(self%first_task)) then
_ASSERT_(self%operation == source_unknown .or. .not. associated(self%first_task%next), 'job_select_order', 'Multiple tasks created while only one source was acceptable.')
end if
self%state = job_state_tasks_created
......@@ -1087,10 +1097,10 @@ contains
if (associated(tree_node%parent)) call create_tasks(tree_node%parent)
! If we have a root that does NOT represent a task itself, we are done.
if (tree_node%operation==source_unknown) return
if (tree_node%operation == source_unknown) return
! Collect all nodes that can be processed with the currently selected source.
call subset%collect(tree_node%operation,removed)
call subset%collect(tree_node%operation, removed)
! Create the task and preprend it to the list.
allocate(task)
......@@ -1521,11 +1531,12 @@ subroutine check_graph_duplicates(self)
end do
end subroutine check_graph_duplicates
subroutine job_manager_initialize(self, variable_register, schedules, unfulfilled_dependencies)
subroutine job_manager_initialize(self, variable_register, schedules, unfulfilled_dependencies, log_unit)
class (type_job_manager), intent(inout) :: self
type (type_global_variable_register), intent(inout) :: variable_register
type (type_schedules), intent(inout) :: schedules
type (type_variable_set), intent(out) :: unfulfilled_dependencies
integer, intent(in) :: log_unit
type (type_job_node), pointer :: node, first_ordered
......@@ -1557,7 +1568,7 @@ subroutine job_manager_initialize(self, variable_register, schedules, unfulfille
! Create tasks. This must be done for all jobs before job_finalize_prefill_settings is called, as this API operates across all jobs.
node => self%first
do while (associated(node))
call job_create_tasks(node%p)
call job_create_tasks(node%p, log_unit)
node => node%next
end do
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment