4646 BaseProcessor ,
4747 JobNotFoundError ,
4848 JobResultNotFoundError ,
49+ ProcessorExecuteError ,
4950 UnknownProcessError ,
5051)
5152from pygeoapi .util import (
5253 get_current_datetime ,
54+ is_request_allowed ,
5355 JobStatus ,
5456 ProcessExecutionMode ,
5557 RequestedProcessExecutionMode ,
@@ -105,7 +107,11 @@ def get_processor(self, process_id: str) -> BaseProcessor:
105107 except KeyError as err :
106108 raise UnknownProcessError ('Invalid process identifier' ) from err
107109 else :
108- return load_plugin ('process' , process_conf ['processor' ])
110+ pp = load_plugin ('process' , process_conf ['processor' ])
111+ pp .allow_internal_requests = process_conf .get (
112+ 'allow_internal_requests' , False )
113+
114+ return pp
109115
110116 def get_jobs (self ,
111117 status : JobStatus = None ,
@@ -395,13 +401,13 @@ def execute_process(
395401 """
396402
397403 job_id = str (uuid .uuid1 ())
398- processor = self .get_processor (process_id )
399- processor .set_job_id (job_id )
404+ self . processor = self .get_processor (process_id )
405+ self . processor .set_job_id (job_id )
400406 extra_execute_handler_parameters = {
401407 'requested_response' : requested_response
402408 }
403409
404- job_control_options = processor .metadata .get (
410+ job_control_options = self . processor .metadata .get (
405411 'jobControlOptions' , [])
406412
407413 if execution_mode == RequestedProcessExecutionMode .respond_async :
@@ -474,7 +480,7 @@ def execute_process(
474480 # TODO: handler's response could also be allowed to include more HTTP
475481 # headers
476482 mime_type , outputs , status = handler (
477- processor ,
483+ self . processor ,
478484 job_id ,
479485 data_dict ,
480486 requested_outputs ,
@@ -484,26 +490,37 @@ def execute_process(
484490
485491 def _send_in_progress_notification (self , subscriber : Optional [Subscriber ]):
486492 if subscriber and subscriber .in_progress_uri :
487- response = requests .post (subscriber .in_progress_uri , json = {})
488- LOGGER .debug (
489- f'In progress notification response: { response .status_code } '
490- )
493+ self .__do_subscriber_request (subscriber .in_progress_uri )
491494
492495 def _send_success_notification (
493496 self , subscriber : Optional [Subscriber ], outputs : Any
494497 ):
495- if subscriber :
496- response = requests .post (subscriber .success_uri , json = outputs )
497- LOGGER .debug (
498- f'Success notification response: { response .status_code } '
499- )
498+ if subscriber and subscriber .success_uri :
499+ self .__do_subscriber_request (subscriber .success_uri , outputs )
500500
501501 def _send_failed_notification (self , subscriber : Optional [Subscriber ]):
502502 if subscriber and subscriber .failed_uri :
503- response = requests .post (subscriber .failed_uri , json = {})
504- LOGGER .debug (
505- f'Failed notification response: { response .status_code } '
506- )
503+ self .__do_subscriber_request (subscriber .failed_uri )
504+
505+ def __do_subscriber_request (self , url : str , data : dict = {}) -> None :
506+ """
507+ Helper function to execute a subscriber URL via HTTP POST
508+
509+ :param url: `str` of URL
510+ :param data: `dict` of request payload
511+
512+ :returns: `None`
513+ """
514+
515+ if not is_request_allowed (url , self .processor .allow_internal_requests ):
516+ msg = 'URL not allowed'
517+ LOGGER .error (f'{ msg } : { url } ' )
518+ raise ProcessorExecuteError (msg )
519+
520+ response = requests .post (url , json = data )
521+ LOGGER .debug (
522+ f'Response: { response .status_code } '
523+ )
507524
508525 def __repr__ (self ):
509526 return f'<BaseManager> { self .name } '
0 commit comments