. (utf8) 2/* Part of SWI-Prolog 3 4 Author: Torbjörn Lager and Jan Wielemaker 5 E-mail: J.Wielemaker@vu.nl 6 WWW: http://www.swi-prolog.org 7 Copyright (C): 2014-2023, Torbjörn Lager, 8 VU University Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(pengines, 39 [ pengine_create/1, % +Options 40 pengine_ask/3, % +Pengine, :Query, +Options 41 pengine_next/2, % +Pengine. +Options 42 pengine_stop/2, % +Pengine. +Options 43 pengine_event/2, % -Event, +Options 44 pengine_input/2, % +Prompt, -Term 45 pengine_output/1, % +Term 46 pengine_respond/3, % +Pengine, +Input, +Options 47 pengine_debug/2, % +Format, +Args 48 pengine_self/1, % -Pengine 49 pengine_pull_response/2, % +Pengine, +Options 50 pengine_destroy/1, % +Pengine 51 pengine_destroy/2, % +Pengine, +Options 52 pengine_abort/1, % +Pengine 53 pengine_application/1, % +Application 54 current_pengine_application/1, % ?Application 55 pengine_property/2, % ?Pengine, ?Property 56 pengine_user/1, % -User 57 pengine_event_loop/2, % :Closure, +Options 58 pengine_rpc/2, % +Server, :Goal 59 pengine_rpc/3 % +Server, :Goal, +Options 60 ]).
71:- autoload(library(aggregate),[aggregate_all/3]). 72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 73:- autoload(library(broadcast),[broadcast/1]). 74:- autoload(library(charsio),[open_chars_stream/2]). 75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 76:- autoload(library(error), 77 [ must_be/2, 78 existence_error/2, 79 permission_error/3, 80 domain_error/2 81 ]). 82:- autoload(library(filesex),[directory_file_path/3]). 83:- autoload(library(listing),[listing/1]). 84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 85:- autoload(library(modules),[in_temporary_module/3]). 86:- autoload(library(occurs),[sub_term/2]). 87:- autoload(library(option), 88 [select_option/3,option/2,option/3,select_option/4]). 89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 90:- autoload(library(sandbox),[safe_goal/1]). 91:- autoload(library(statistics),[thread_statistics/2]). 92:- autoload(library(term_to_json),[term_to_json/2]). 93:- autoload(library(thread_pool), 94 [thread_pool_create/3,thread_create_in_pool/4]). 95:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 96:- autoload(library(uri), 97 [ uri_components/2, 98 uri_query_components/2, 99 uri_data/3, 100 uri_data/4, 101 uri_encoded/3 102 ]). 103:- autoload(library(http/http_client),[http_read_data/3]). 104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 105:- autoload(library(http/http_dispatch), 106 [http_handler/3,http_404/2,http_reply_file/3]). 107:- autoload(library(http/http_open),[http_open/3]). 108:- autoload(library(http/http_parameters),[http_parameters/2]). 109:- autoload(library(http/http_stream),[is_cgi_stream/1]). 110:- autoload(library(http/http_wrapper),[http_peer/2]). 111 112:- use_module(library(settings),[setting/2,setting/4]). 113:- use_module(library(http/http_json), 114 [http_read_json_dict/2,reply_json/1]). 115 116:- if(exists_source(library(uuid))). 117:- autoload(library(uuid), [uuid/2]). 118:- endif. 119 120 121:- meta_predicate 122 pengine_create( ), 123 pengine_rpc( , , ), 124 pengine_event_loop( , ). 125 126:- multifile 127 write_result/3, % +Format, +Event, +Dict 128 event_to_json/3, % +Event, -JSON, +Format 129 prepare_module/3, % +Module, +Application, +Options 130 prepare_goal/3, % +GoalIn, -GoalOut, +Options 131 authentication_hook/3, % +Request, +Application, -User 132 not_sandboxed/2. % +User, +App 133 134:- predicate_options(pengine_create/1, 1, 135 [ id(-atom), 136 alias(atom), 137 application(atom), 138 destroy(boolean), 139 server(atom), 140 ask(compound), 141 template(compound), 142 chunk(integer;oneof([false])), 143 bindings(list), 144 src_list(list), 145 src_text(any), % text 146 src_url(atom), 147 src_predicates(list) 148 ]). 149:- predicate_options(pengine_ask/3, 3, 150 [ template(any), 151 chunk(integer;oneof([false])), 152 bindings(list) 153 ]). 154:- predicate_options(pengine_next/2, 2, 155 [ chunk(integer), 156 pass_to(pengine_send/3, 3) 157 ]). 158:- predicate_options(pengine_stop/2, 2, 159 [ pass_to(pengine_send/3, 3) 160 ]). 161:- predicate_options(pengine_respond/3, 2, 162 [ pass_to(pengine_send/3, 3) 163 ]). 164:- predicate_options(pengine_rpc/3, 3, 165 [ chunk(integer;oneof([false])), 166 pass_to(pengine_create/1, 1) 167 ]). 168:- predicate_options(pengine_send/3, 3, 169 [ delay(number) 170 ]). 171:- predicate_options(pengine_event/2, 2, 172 [ listen(atom), 173 pass_to(system:thread_get_message/3, 3) 174 ]). 175:- predicate_options(pengine_pull_response/2, 2, 176 [ pass_to(http_open/3, 3) 177 ]). 178:- predicate_options(pengine_event_loop/2, 2, 179 []). % not yet implemented 180 181% :- debug(pengine(transition)). 182:- debug(pengine(debug)). % handle pengine_debug in pengine_rpc/3. 183 184goal_expansion(random_delay, Expanded) :- 185 ( debugging(pengine(delay)) 186 -> Expanded = do_random_delay 187 ; Expanded = true 188 ). 189 190do_random_delay :- 191 Delay is random(20)/1000, 192 sleep(Delay). 193 194:- meta_predicate % internal meta predicates 195 solve( , , , ), 196 findnsols_no_empty( , , , ), 197 pengine_event_loop( , , ).
Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..
Successful creation of a pengine will return an event term of the following form:
An error will be returned if the pengine could not be created:
252pengine_create(M:Options0) :-
253 translate_local_sources(Options0, Options, M),
254 ( select_option(server(BaseURL), Options, RestOptions)
255 -> remote_pengine_create(BaseURL, RestOptions)
256 ; local_pengine_create(Options)
257 ).
src_predicates
and src_list
options into
src_text
. We need to do that anyway for remote pengines. For
local pengines, we could avoid this step, but there is very
little point in transferring source to a local pengine anyway as
local pengines can access any Prolog predicate that you make
visible to the application.
Multiple sources are concatenated to end up with a single src_text option.
271translate_local_sources(OptionsIn, Options, Module) :- 272 translate_local_sources(OptionsIn, Sources, Options2, Module), 273 ( Sources == [] 274 -> Options = Options2 275 ; Sources = [Source] 276 -> Options = [src_text(Source)|Options2] 277 ; atomics_to_string(Sources, Source) 278 -> Options = [src_text(Source)|Options2] 279 ). 280 281translate_local_sources([], [], [], _). 282translate_local_sources([H0|T], [S0|S], Options, M) :- 283 nonvar(H0), 284 translate_local_source(H0, S0, M), 285 !, 286 translate_local_sources(T, S, Options, M). 287translate_local_sources([H|T0], S, [H|T], M) :- 288 translate_local_sources(T0, S, T, M). 289 290translate_local_source(src_predicates(PIs), Source, M) :- 291 must_be(list, PIs), 292 with_output_to(string(Source), 293 maplist(list_in_module(M), PIs)). 294translate_local_source(src_list(Terms), Source, _) :- 295 must_be(list, Terms), 296 with_output_to(string(Source), 297 forall(member(Term, Terms), 298 format('~k .~n', [Term]))). 299translate_local_source(src_text(Source), Source, _). 300 301list_in_module(M, PI) :- 302 listing(M:PI).
pengine_send(NameOrID, Term, [])
.
*/
309pengine_send(Target, Event) :-
310 pengine_send(Target, Event, []).
Any remaining options are passed to http_open/3. */
325pengine_send(Target, Event, Options) :- 326 must_be(atom, Target), 327 pengine_send2(Target, Event, Options). 328 329pengine_send2(self, Event, Options) :- 330 !, 331 thread_self(Queue), 332 delay_message(queue(Queue), Event, Options). 333pengine_send2(Name, Event, Options) :- 334 child(Name, Target), 335 !, 336 delay_message(pengine(Target), Event, Options). 337pengine_send2(Target, Event, Options) :- 338 delay_message(pengine(Target), Event, Options). 339 340delay_message(Target, Event, Options) :- 341 option(delay(Delay), Options), 342 !, 343 alarm(Delay, 344 send_message(Target, Event, Options), 345 _AlarmID, 346 [remove(true)]). 347delay_message(Target, Event, Options) :- 348 random_delay, 349 send_message(Target, Event, Options). 350 351send_message(queue(Queue), Event, _) :- 352 thread_send_message(Queue, pengine_request(Event)). 353send_message(pengine(Pengine), Event, Options) :- 354 ( pengine_remote(Pengine, Server) 355 -> remote_pengine_send(Server, Pengine, Event, Options) 356 ; pengine_thread(Pengine, Thread) 357 -> thread_send_message(Thread, pengine_request(Event)) 358 ; existence_error(pengine, Pengine) 359 ).
idle_limit
setting while using thread_idle/2 to minimis
resources.369pengine_request(Request) :- 370 thread_self(Me), 371 thread_get_message(Me, pengine_request(Request), [timeout(1)]), 372 !. 373pengine_request(Request) :- 374 pengine_self(Self), 375 get_pengine_application(Self, Application), 376 setting(Application:idle_limit, IdleLimit0), 377 IdleLimit is IdleLimit0-1, 378 thread_self(Me), 379 ( thread_idle(thread_get_message(Me, pengine_request(Request), 380 [timeout(IdleLimit)]), 381 long) 382 -> true 383 ; Request = destroy 384 ).
If the message cannot be sent within the idle_limit
setting of
the pengine, abort the pengine.
397pengine_reply(Event) :- 398 pengine_parent(Queue), 399 pengine_reply(Queue, Event). 400 401pengine_reply(_Queue, _Event0) :- 402 nb_current(pengine_idle_limit_exceeded, true), 403 !. 404pengine_reply(Queue, Event0) :- 405 arg(1, Event0, ID), 406 wrap_first_answer(ID, Event0, Event), 407 random_delay, 408 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]), 409 ( pengine_self(ID), 410 \+ pengine_detached(ID, _) 411 -> get_pengine_application(ID, Application), 412 setting(Application:idle_limit, IdleLimit), 413 debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]), 414 ( thread_send_message(Queue, pengine_event(ID, Event), 415 [ timeout(IdleLimit) 416 ]) 417 -> true 418 ; thread_self(Me), 419 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)', 420 [ID, Me]), 421 nb_setval(pengine_idle_limit_exceeded, true), 422 thread_detach(Me), 423 abort 424 ) 425 ; thread_send_message(Queue, pengine_event(ID, Event)) 426 ). 427 428wrap_first_answer(ID, Event0, CreateEvent) :- 429 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]), 430 arg(1, CreateEvent, ID), 431 !, 432 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])). 433wrap_first_answer(_ID, Event, Event). 434 435 436empty_queue :- 437 pengine_parent(Queue), 438 empty_queue(Queue, 0, Discarded), 439 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]). 440 441empty_queue(Queue, C0, C) :- 442 thread_get_message(Queue, _Term, [timeout(0)]), 443 !, 444 C1 is C0+1, 445 empty_queue(Queue, C1, C). 446empty_queue(_, C, C).
Options is a list of options:
false
, the
Pengine goal is not executed using findall/3 and friends and
we do not backtrack immediately over the goal. As a result,
changes to backtrackable global state are retained. This is
similar that using set_prolog_flag(toplevel_mode, recursive)
.Name = Var
terms, providing access to the actual variable
names.Any remaining options are passed to pengine_send/3.
Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.
true
or false
, indicating whether we can expect the
pengine to be able to return more solutions or not, would we call
pengine_next/2.Defined in terms of pengine_send/3, like so:
pengine_ask(ID, Query, Options) :- partition(pengine_ask_option, Options, AskOptions, SendOptions), pengine_send(ID, ask(Query, AskOptions), SendOptions).
*/
515pengine_ask(ID, Query, Options) :- 516 partition(pengine_ask_option, Options, AskOptions, SendOptions), 517 pengine_send(ID, ask(Query, AskOptions), SendOptions). 518 519 520pengine_ask_option(template(_)). 521pengine_ask_option(chunk(_)). 522pengine_ask_option(bindings(_)). 523pengine_ask_option(breakpoints(_)).
chunk(false)
.Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.
Defined in terms of pengine_send/3, as follows:
pengine_next(ID, Options) :- pengine_send(ID, next, Options).
*/
568pengine_next(ID, Options) :- 569 select_option(chunk(Count), Options, Options1), 570 !, 571 pengine_send(ID, next(Count), Options1). 572pengine_next(ID, Options) :- 573 pengine_send(ID, next, Options).
Defined in terms of pengine_send/3, like so:
pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
*/
589pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
600pengine_abort(Name) :-
601 ( child(Name, Pengine)
602 -> true
603 ; Pengine = Name
604 ),
605 ( pengine_remote(Pengine, Server)
606 -> remote_pengine_abort(Server, Pengine, [])
607 ; pengine_thread(Pengine, Thread),
608 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
609 catch(thread_signal(Thread, throw(abort_query)), _, true)
610 ).
force(true)
, the pengine
is killed using abort/0 and pengine_destroy/2 succeeds.
*/620pengine_destroy(ID) :- 621 pengine_destroy(ID, []). 622 623pengine_destroy(Name, Options) :- 624 ( child(Name, ID) 625 -> true 626 ; ID = Name 627 ), 628 option(force(true), Options), 629 !, 630 ( pengine_thread(ID, Thread) 631 -> catch(thread_signal(Thread, abort), 632 error(existence_error(thread, _), _), true) 633 ; true 634 ). 635pengine_destroy(ID, _) :- 636 catch(pengine_send(ID, destroy), 637 error(existence_error(pengine, ID), _), 638 retractall(child(_,ID))). 639 640 641/*================= pengines administration ======================= 642*/
thread(ThreadId)
remote(URL)
653:- dynamic 654 current_pengine/6, % Id, ParentId, Thread, URL, App, Destroy 655 pengine_queue/4, % Id, Queue, TimeOut, Time 656 output_queue/3, % Id, Queue, Time 657 pengine_user/2, % Id, User 658 pengine_data/2, % Id, Data 659 pengine_detached/2. % Id, Data 660:- volatile 661 current_pengine/6, 662 pengine_queue/4, 663 output_queue/3, 664 pengine_user/2, 665 pengine_data/2, 666 pengine_detached/2. 667 668:- thread_local 669 child/2. % ?Name, ?Child
675pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :- 676 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)). 677 678pengine_register_remote(Id, URL, Application, Destroy) :- 679 thread_self(Queue), 680 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
http
and the queue is the
message queue used to send events to the HTTP workers.688pengine_unregister(Id) :- 689 thread_self(Me), 690 ( current_pengine(Id, Queue, Me, http, _, _) 691 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue)) 692 ; true 693 ), 694 retractall(current_pengine(Id, _, Me, _, _, _)), 695 retractall(pengine_user(Id, _)), 696 retractall(pengine_data(Id, _)). 697 698pengine_unregister_remote(Id) :- 699 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
705pengine_self(Id) :- 706 thread_self(Thread), 707 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy). 708 709pengine_parent(Parent) :- 710 nb_getval(pengine_parent, Parent). 711 712pengine_thread(Pengine, Thread) :- 713 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy), 714 Thread \== 0, 715 !. 716 717pengine_remote(Pengine, URL) :- 718 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy). 719 720get_pengine_application(Pengine, Application) :- 721 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy), 722 !. 723 724get_pengine_module(Pengine, Pengine). 725 726:- if(current_predicate(uuid/2)). 727pengine_uuid(Id) :- 728 uuid(Id, [version(4)]). % Version 4 is random. 729:- else. 730pengine_uuid(Id) :- 731 ( current_prolog_flag(max_integer, Max1) 732 -> Max is Max1-1 733 ; Max is 1<<128 734 ), 735 random_between(0, Max, Num), 736 atom_number(Id, Num). 737:- endif.
This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.
754:- meta_predicate protect_pengine( , ). 755 756protect_pengine(Id, Goal) :- 757 term_hash(Id, Hash), 758 LockN is Hash mod 64, 759 atom_concat(pengine_done_, LockN, Lock), 760 with_mutex(Lock, 761 ( pengine_thread(Id, _) 762 -> Goal 763 ; Goal 764 )).
pengine_sandbox
. The example below creates a new application
address_book
and imports the API defined in the module file
adress_book_api.pl
into the application.
:- pengine_application(address_book). :- use_module(address_book:adress_book_api).
*/
781pengine_application(Application) :- 782 throw(error(context_error(nodirective, 783 pengine_application(Application)), _)). 784 785:- multifile 786 system:term_expansion/2, 787 current_application/1.
795current_pengine_application(Application) :- 796 current_application(Application). 797 798 799% Default settings for all applications 800 801:- setting(thread_pool_size, integer, 100, 802 'Maximum number of pengines this application can run.'). 803:- setting(thread_pool_stacks, list(compound), [], 804 'Maximum stack sizes for pengines this application can run.'). 805:- setting(slave_limit, integer, 3, 806 'Maximum number of slave pengines a master pengine can create.'). 807:- setting(time_limit, number, 300, 808 'Maximum time to wait for output'). 809:- setting(idle_limit, number, 300, 810 'Pengine auto-destroys when idle for this time'). 811:- setting(safe_goal_limit, number, 10, 812 'Maximum time to try proving safety of the goal'). 813:- setting(program_space, integer, 100_000_000, 814 'Maximum memory used by predicates'). 815:- setting(allow_from, list(atom), [*], 816 'IP addresses from which remotes are allowed to connect'). 817:- setting(deny_from, list(atom), [], 818 'IP addresses from which remotes are NOT allowed to connect'). 819:- setting(debug_info, boolean, false, 820 'Keep information to support source-level debugging'). 821 822 823systemterm_expansion((:- pengine_application(Application)), Expanded) :- 824 must_be(atom, Application), 825 ( module_property(Application, file(_)) 826 -> permission_error(create, pengine_application, Application) 827 ; true 828 ), 829 expand_term((:- setting(Application:thread_pool_size, integer, 830 setting(pengines:thread_pool_size), 831 'Maximum number of pengines this \c 832 application can run.')), 833 ThreadPoolSizeSetting), 834 expand_term((:- setting(Application:thread_pool_stacks, list(compound), 835 setting(pengines:thread_pool_stacks), 836 'Maximum stack sizes for pengines \c 837 this application can run.')), 838 ThreadPoolStacksSetting), 839 expand_term((:- setting(Application:slave_limit, integer, 840 setting(pengines:slave_limit), 841 'Maximum number of local slave pengines \c 842 a master pengine can create.')), 843 SlaveLimitSetting), 844 expand_term((:- setting(Application:time_limit, number, 845 setting(pengines:time_limit), 846 'Maximum time to wait for output')), 847 TimeLimitSetting), 848 expand_term((:- setting(Application:idle_limit, number, 849 setting(pengines:idle_limit), 850 'Pengine auto-destroys when idle for this time')), 851 IdleLimitSetting), 852 expand_term((:- setting(Application:safe_goal_limit, number, 853 setting(pengines:safe_goal_limit), 854 'Maximum time to try proving safety of the goal')), 855 SafeGoalLimitSetting), 856 expand_term((:- setting(Application:program_space, integer, 857 setting(pengines:program_space), 858 'Maximum memory used by predicates')), 859 ProgramSpaceSetting), 860 expand_term((:- setting(Application:allow_from, list(atom), 861 setting(pengines:allow_from), 862 'IP addresses from which remotes are allowed \c 863 to connect')), 864 AllowFromSetting), 865 expand_term((:- setting(Application:deny_from, list(atom), 866 setting(pengines:deny_from), 867 'IP addresses from which remotes are NOT \c 868 allowed to connect')), 869 DenyFromSetting), 870 expand_term((:- setting(Application:debug_info, boolean, 871 setting(pengines:debug_info), 872 'Keep information to support source-level \c 873 debugging')), 874 DebugInfoSetting), 875 flatten([ pengines:current_application(Application), 876 ThreadPoolSizeSetting, 877 ThreadPoolStacksSetting, 878 SlaveLimitSetting, 879 TimeLimitSetting, 880 IdleLimitSetting, 881 SafeGoalLimitSetting, 882 ProgramSpaceSetting, 883 AllowFromSetting, 884 DenyFromSetting, 885 DebugInfoSetting 886 ], Expanded). 887 888% Register default application 889 890:- pengine_application(pengine_sandbox).
alias
option when creating the pengine.true
if the pengines is destroyed automatically
after completing the query.debug_info
is present.927pengine_property(Id, Prop) :- 928 nonvar(Id), nonvar(Prop), 929 pengine_property2(Prop, Id), 930 !. 931pengine_property(Id, Prop) :- 932 pengine_property2(Prop, Id). 933 934pengine_property2(self(Id), Id) :- 935 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 936pengine_property2(module(Id), Id) :- 937 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 938pengine_property2(alias(Alias), Id) :- 939 child(Alias, Id), 940 Alias \== Id. 941pengine_property2(thread(Thread), Id) :- 942 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy), 943 Thread \== 0. 944pengine_property2(remote(Server), Id) :- 945 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy). 946pengine_property2(application(Application), Id) :- 947 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy). 948pengine_property2(destroy(Destroy), Id) :- 949 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy). 950pengine_property2(parent(Parent), Id) :- 951 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy). 952pengine_property2(source(SourceID, Source), Id) :- 953 pengine_data(Id, source(SourceID, Source)). 954pengine_property2(detached(When), Id) :- 955 pengine_detached(Id, When).
962pengine_output(Term) :-
963 pengine_self(Me),
964 pengine_reply(output(Me, Term)).
console.log(Message)
if there is a console. The predicate
pengine_rpc/3 calls debug(pengine(debug), '~w', [Message])
. The debug
topic pengine(debug)
is enabled by default.
979pengine_debug(Format, Args) :- 980 pengine_parent(Queue), 981 pengine_self(Self), 982 catch(safe_goal(format(atom(_), Format, Args)), E, true), 983 ( var(E) 984 -> format(atom(Message), Format, Args) 985 ; message_to_string(E, Message) 986 ), 987 pengine_reply(Queue, debug(Self, Message)). 988 989 990/*================= Local pengine ======================= 991*/
1002local_pengine_create(Options) :-
1003 thread_self(Self),
1004 option(application(Application), Options, pengine_sandbox),
1005 create(Self, Child, Options, local, Application),
1006 option(alias(Name), Options, Child),
1007 assert(child(Name, Child)).
1014:- multifile thread_pool:create_pool/1. 1015 1016thread_poolcreate_pool(Application) :- 1017 current_application(Application), 1018 setting(Application:thread_pool_size, Size), 1019 setting(Application:thread_pool_stacks, Stacks), 1020 thread_pool_create(Application, Size, Stacks).
1030create(Queue, Child, Options, local, Application) :- 1031 !, 1032 pengine_child_id(Child), 1033 create0(Queue, Child, Options, local, Application). 1034create(Queue, Child, Options, URL, Application) :- 1035 pengine_child_id(Child), 1036 catch(create0(Queue, Child, Options, URL, Application), 1037 Error, 1038 create_error(Queue, Child, Error)). 1039 1040pengine_child_id(Child) :- 1041 ( nonvar(Child) 1042 -> true 1043 ; pengine_uuid(Child) 1044 ). 1045 1046create_error(Queue, Child, Error) :- 1047 pengine_reply(Queue, error(Child, Error)). 1048 1049create0(Queue, Child, Options, URL, Application) :- 1050 ( current_application(Application) 1051 -> true 1052 ; existence_error(pengine_application, Application) 1053 ), 1054 ( URL \== http % pengine is _not_ a child of the 1055 % HTTP server thread 1056 -> aggregate_all(count, child(_,_), Count), 1057 setting(Application:slave_limit, Max), 1058 ( Count >= Max 1059 -> throw(error(resource_error(max_pengines), _)) 1060 ; true 1061 ) 1062 ; true 1063 ), 1064 partition(pengine_create_option, Options, PengineOptions, RestOptions), 1065 thread_create_in_pool( 1066 Application, 1067 pengine_main(Queue, PengineOptions, Application), ChildThread, 1068 [ at_exit(pengine_done) 1069 | RestOptions 1070 ]), 1071 option(destroy(Destroy), PengineOptions, true), 1072 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy), 1073 thread_send_message(ChildThread, pengine_registered(Child)), 1074 ( option(id(Id), Options) 1075 -> Id = Child 1076 ; true 1077 ). 1078 1079pengine_create_option(src_text(_)). 1080pengine_create_option(src_url(_)). 1081pengine_create_option(application(_)). 1082pengine_create_option(destroy(_)). 1083pengine_create_option(ask(_)). 1084pengine_create_option(template(_)). 1085pengine_create_option(bindings(_)). 1086pengine_create_option(chunk(_)). 1087pengine_create_option(alias(_)). 1088pengine_create_option(user(_)).
at_exit
option. Destroys child
pengines using pengine_destroy/1. Cleaning up the Pengine is
synchronised by the pengine_done
mutex. See read_event/6.1097:- public 1098 pengine_done/0. 1099 1100pengine_done :- 1101 thread_self(Me), 1102 ( thread_property(Me, status(exception('$aborted'))), 1103 thread_detach(Me), 1104 pengine_self(Pengine) 1105 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))), 1106 error(_,_), true) 1107 ; true 1108 ), 1109 forall(child(_Name, Child), 1110 pengine_destroy(Child)), 1111 pengine_self(Id), 1112 protect_pengine(Id, pengine_unregister(Id)).
1120:- thread_local wrap_first_answer_in_create_event/2. 1121 1122:- meta_predicate 1123 pengine_prepare_source( , ). 1124 1125pengine_main(Parent, Options, Application) :- 1126 fix_streams, 1127 thread_get_message(pengine_registered(Self)), 1128 nb_setval(pengine_parent, Parent), 1129 pengine_register_user(Options), 1130 set_prolog_flag(mitigate_spectre, true), 1131 catch(in_temporary_module( 1132 Self, 1133 pengine_prepare_source(Application, Options), 1134 pengine_create_and_loop(Self, Application, Options)), 1135 prepare_source_failed, 1136 pengine_terminate(Self)). 1137 1138pengine_create_and_loop(Self, Application, Options) :- 1139 setting(Application:slave_limit, SlaveLimit), 1140 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]), 1141 ( option(ask(Query0), Options) 1142 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)), 1143 ( string(Query0) % string is not callable 1144 -> ( option(template(TemplateS), Options) 1145 -> Ask2 = Query0-TemplateS 1146 ; Ask2 = Query0 1147 ), 1148 catch(ask_to_term(Ask2, Self, Query, Template, Bindings), 1149 Error, true), 1150 ( var(Error) 1151 -> true 1152 ; send_error(Error), 1153 throw(prepare_source_failed) 1154 ) 1155 ; Query = Query0, 1156 option(template(Template), Options, Query), 1157 option(bindings(Bindings), Options, []) 1158 ), 1159 option(chunk(Chunk), Options, 1), 1160 pengine_ask(Self, Query, 1161 [ template(Template), 1162 chunk(Chunk), 1163 bindings(Bindings) 1164 ]) 1165 ; Extra = [], 1166 pengine_reply(CreateEvent) 1167 ), 1168 pengine_main_loop(Self).
1178ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :- 1179 !, 1180 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]), 1181 term_string(t(Template1,Ask1), AskTemplate, 1182 [ variable_names(Bindings0), 1183 module(Module) 1184 ]), 1185 phrase(template_bindings(Template1, Bindings0), Bindings). 1186ask_to_term(Ask, Module, Ask1, Template, Bindings1) :- 1187 term_string(Ask1, Ask, 1188 [ variable_names(Bindings), 1189 module(Module) 1190 ]), 1191 exclude(anon, Bindings, Bindings1), 1192 dict_create(Template, swish_default_template, Bindings1). 1193 1194template_bindings(Var, Bindings) --> 1195 { var(Var) }, !, 1196 ( { var_binding(Bindings, Var, Binding) 1197 } 1198 -> [Binding] 1199 ; [] 1200 ). 1201template_bindings([H|T], Bindings) --> 1202 !, 1203 template_bindings(H, Bindings), 1204 template_bindings(T, Bindings). 1205template_bindings(Compoound, Bindings) --> 1206 { compound(Compoound), !, 1207 compound_name_arguments(Compoound, _, Args) 1208 }, 1209 template_bindings(Args, Bindings). 1210template_bindings(_, _) --> []. 1211 1212var_binding(Bindings, Var, Binding) :- 1213 member(Binding, Bindings), 1214 arg(2, Binding, V), 1215 V == Var, !.
1222fix_streams :- 1223 fix_stream(current_output). 1224 1225fix_stream(Name) :- 1226 is_cgi_stream(Name), 1227 !, 1228 debug(pengine(stream), '~w is a CGI stream!', [Name]), 1229 set_stream(user_output, alias(Name)). 1230fix_stream(_).
1239pengine_prepare_source(Module:Application, Options) :- 1240 setting(Application:program_space, SpaceLimit), 1241 set_module(Module:program_space(SpaceLimit)), 1242 delete_import_module(Module, user), 1243 add_import_module(Module, Application, start), 1244 catch(prep_module(Module, Application, Options), Error, true), 1245 ( var(Error) 1246 -> true 1247 ; send_error(Error), 1248 throw(prepare_source_failed) 1249 ). 1250 1251prep_module(Module, Application, Options) :- 1252 maplist(copy_flag(Module, Application), [var_prefix]), 1253 forall(prepare_module(Module, Application, Options), true), 1254 setup_call_cleanup( 1255 '$set_source_module'(OldModule, Module), 1256 maplist(process_create_option(Module), Options), 1257 '$set_source_module'(OldModule)). 1258 1259copy_flag(Module, Application, Flag) :- 1260 current_prolog_flag(ApplicationFlag, Value), 1261 !, 1262 set_prolog_flag(ModuleFlag, Value). 1263copy_flag(_, _, _). 1264 1265process_create_option(Application, src_text(Text)) :- 1266 !, 1267 pengine_src_text(Text, Application). 1268process_create_option(Application, src_url(URL)) :- 1269 !, 1270 pengine_src_url(URL, Application). 1271process_create_option(_, _).
src_text
and
src_url
options1294pengine_main_loop(ID) :- 1295 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)). 1296 1297pengine_aborted(ID) :- 1298 thread_self(Self), 1299 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]), 1300 empty_queue, 1301 destroy_or_continue(abort(ID)).
1314guarded_main_loop(ID) :- 1315 pengine_request(Request), 1316 ( Request = destroy 1317 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]), 1318 pengine_terminate(ID) 1319 ; Request = ask(Goal, Options) 1320 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]), 1321 ask(ID, Goal, Options) 1322 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]), 1323 pengine_reply(error(ID, error(protocol_error, _))), 1324 guarded_main_loop(ID) 1325 ). 1326 1327 1328pengine_terminate(ID) :- 1329 pengine_reply(destroy(ID)), 1330 thread_self(Me), % Make the thread silently disappear 1331 thread_detach(Me).
1342solve(Chunk, Template, Goal, ID) :- 1343 prolog_current_choice(Choice), 1344 ( integer(Chunk) 1345 -> State = count(Chunk) 1346 ; Chunk == false 1347 -> State = no_chunk 1348 ; domain_error(chunk, Chunk) 1349 ), 1350 statistics(cputime, Epoch), 1351 Time = time(Epoch), 1352 nb_current('$variable_names', Bindings), 1353 filter_template(Template, Bindings, Template2), 1354 '$current_typein_module'(CurrTypeIn), 1355 ( '$set_typein_module'(ID), 1356 call_cleanup(catch(findnsols_no_empty(State, Template2, 1357 set_projection(Goal, Bindings), 1358 Result), 1359 Error, true), 1360 query_done(Det, CurrTypeIn)), 1361 arg(1, Time, T0), 1362 statistics(cputime, T1), 1363 CPUTime is T1-T0, 1364 ( var(Error) 1365 -> projection(Projection), 1366 ( var(Det) 1367 -> pengine_reply(success(ID, Result, Projection, 1368 CPUTime, true)), 1369 more_solutions(ID, Choice, State, Time) 1370 ; !, % commit 1371 destroy_or_continue(success(ID, Result, Projection, 1372 CPUTime, false)) 1373 ) 1374 ; !, % commit 1375 ( Error == abort_query 1376 -> throw(Error) 1377 ; destroy_or_continue(error(ID, Error)) 1378 ) 1379 ) 1380 ; !, % commit 1381 arg(1, Time, T0), 1382 statistics(cputime, T1), 1383 CPUTime is T1-T0, 1384 destroy_or_continue(failure(ID, CPUTime)) 1385 ). 1386solve(_, _, _, _). % leave a choice point 1387 1388query_done(true, CurrTypeIn) :- 1389 '$set_typein_module'(CurrTypeIn).
1398set_projection(Goal, Bindings) :- 1399 b_setval('$variable_names', Bindings), 1400 call(Goal). 1401 1402projection(Projection) :- 1403 nb_current('$variable_names', Bindings), 1404 !, 1405 maplist(var_name, Bindings, Projection). 1406projection([]).
1416filter_template(Template0, Bindings, Template) :- 1417 is_dict(Template0, swish_default_template), 1418 !, 1419 dict_create(Template, swish_default_template, Bindings). 1420filter_template(Template, _Bindings, Template). 1421 1422findnsols_no_empty(no_chunk, Template, Goal, List) => 1423 List = [Template], 1424 call(Goal). 1425findnsols_no_empty(State, Template, Goal, List) => 1426 findnsols(State, Template, Goal, List), 1427 List \== []. 1428 1429destroy_or_continue(Event) :- 1430 arg(1, Event, ID), 1431 ( pengine_property(ID, destroy(true)) 1432 -> thread_self(Me), 1433 thread_detach(Me), 1434 pengine_reply(destroy(ID, Event)) 1435 ; pengine_reply(Event), 1436 guarded_main_loop(ID) 1437 ).
chunk
solutions.next
, but sets the new chunk-size to Count.1455more_solutions(ID, Choice, State, Time) :- 1456 pengine_request(Event), 1457 more_solutions(Event, ID, Choice, State, Time). 1458 1459more_solutions(stop, ID, _Choice, _State, _Time) :- 1460 !, 1461 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]), 1462 destroy_or_continue(stop(ID)). 1463more_solutions(next, ID, _Choice, _State, Time) :- 1464 !, 1465 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]), 1466 statistics(cputime, T0), 1467 nb_setarg(1, Time, T0), 1468 fail. 1469more_solutions(next(Count), ID, _Choice, State, Time) :- 1470 Count > 0, 1471 State = count(_), % else fallthrough to protocol error 1472 !, 1473 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]), 1474 nb_setarg(1, State, Count), 1475 statistics(cputime, T0), 1476 nb_setarg(1, Time, T0), 1477 fail. 1478more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :- 1479 !, 1480 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]), 1481 prolog_cut_to(Choice), 1482 ask(ID, Goal, Options). 1483more_solutions(destroy, ID, _Choice, _State, _Time) :- 1484 !, 1485 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]), 1486 pengine_terminate(ID). 1487more_solutions(Event, ID, Choice, State, Time) :- 1488 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]), 1489 pengine_reply(error(ID, error(protocol_error, _))), 1490 more_solutions(ID, Choice, State, Time).
chunk(N)
option.
1498ask(ID, Goal, Options) :-
1499 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1500 !,
1501 ( var(Error)
1502 -> option(template(Template), Options, Goal),
1503 option(chunk(N), Options, 1),
1504 solve(N, Template, Goal1, ID)
1505 ; pengine_reply(error(ID, Error)),
1506 guarded_main_loop(ID)
1507 ).
Note that expand_goal(Module:GoalIn, GoalOut)
is what we'd like
to write, but this does not work correctly if the user wishes to
expand X:Y
while interpreting X not as the module in which
to run Y. This happens in the CQL package. Possibly we should
disallow this reinterpretation?
1521prepare_goal(ID, Goal0, Module:Goal, Options) :-
1522 option(bindings(Bindings), Options, []),
1523 b_setval('$variable_names', Bindings),
1524 ( prepare_goal(Goal0, Goal1, Options)
1525 -> true
1526 ; Goal1 = Goal0
1527 ),
1528 get_pengine_module(ID, Module),
1529 setup_call_cleanup(
1530 '$set_source_module'(Old, Module),
1531 expand_goal(Goal1, Goal),
1532 '$set_source_module'(_, Old)),
1533 ( pengine_not_sandboxed(ID)
1534 -> true
1535 ; get_pengine_application(ID, App),
1536 setting(App:safe_goal_limit, Limit),
1537 catch(call_with_time_limit(
1538 Limit,
1539 safe_goal(Module:Goal)), E, true)
1540 -> ( var(E)
1541 -> true
1542 ; E = time_limit_exceeded
1543 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1544 ; throw(E)
1545 )
1546 ).
not_sandboxed(User, Application)
must succeed.
1566pengine_not_sandboxed(ID) :-
1567 pengine_user(ID, User),
1568 pengine_property(ID, application(App)),
1569 not_sandboxed(User, App),
1570 !.
1592pengine_pull_response(Pengine, Options) :- 1593 pengine_remote(Pengine, Server), 1594 !, 1595 remote_pengine_pull_response(Server, Pengine, Options). 1596pengine_pull_response(_ID, _Options).
1605pengine_input(Prompt, Term) :-
1606 pengine_self(Self),
1607 pengine_parent(Parent),
1608 pengine_reply(Parent, prompt(Self, Prompt)),
1609 pengine_request(Request),
1610 ( Request = input(Input)
1611 -> Term = Input
1612 ; Request == destroy
1613 -> abort
1614 ; throw(error(protocol_error,_))
1615 ).
Defined in terms of pengine_send/3, as follows:
pengine_respond(Pengine, Input, Options) :- pengine_send(Pengine, input(Input), Options).
*/
1632pengine_respond(Pengine, Input, Options) :-
1633 pengine_send(Pengine, input(Input), Options).
1642send_error(error(Formal, context(prolog_stack(Frames), Message))) :- 1643 is_list(Frames), 1644 !, 1645 with_output_to(string(Stack), 1646 print_prolog_backtrace(current_output, Frames)), 1647 pengine_self(Self), 1648 replace_blobs(Formal, Formal1), 1649 replace_blobs(Message, Message1), 1650 pengine_reply(error(Self, error(Formal1, 1651 context(prolog_stack(Stack), Message1)))). 1652send_error(Error) :- 1653 pengine_self(Self), 1654 replace_blobs(Error, Error1), 1655 pengine_reply(error(Self, Error1)).
1663replace_blobs(Blob, Atom) :- 1664 blob(Blob, Type), Type \== text, 1665 !, 1666 format(atom(Atom), '~p', [Blob]). 1667replace_blobs(Term0, Term) :- 1668 compound(Term0), 1669 !, 1670 compound_name_arguments(Term0, Name, Args0), 1671 maplist(replace_blobs, Args0, Args), 1672 compound_name_arguments(Term, Name, Args). 1673replace_blobs(Term, Term). 1674 1675 1676/*================= Remote pengines ======================= 1677*/ 1678 1679 1680remote_pengine_create(BaseURL, Options) :- 1681 partition(pengine_create_option, Options, PengineOptions0, RestOptions), 1682 ( option(ask(Query), PengineOptions0), 1683 \+ option(template(_Template), PengineOptions0) 1684 -> PengineOptions = [template(Query)|PengineOptions0] 1685 ; PengineOptions = PengineOptions0 1686 ), 1687 options_to_dict(PengineOptions, PostData), 1688 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions), 1689 arg(1, Reply, ID), 1690 ( option(id(ID2), Options) 1691 -> ID = ID2 1692 ; true 1693 ), 1694 option(alias(Name), Options, ID), 1695 assert(child(Name, ID)), 1696 ( ( functor(Reply, create, _) % actually created 1697 ; functor(Reply, output, _) % compiler messages 1698 ) 1699 -> option(application(Application), PengineOptions, pengine_sandbox), 1700 option(destroy(Destroy), PengineOptions, true), 1701 pengine_register_remote(ID, BaseURL, Application, Destroy) 1702 ; true 1703 ), 1704 thread_self(Queue), 1705 pengine_reply(Queue, Reply). 1706 1707options_to_dict(Options, Dict) :- 1708 select_option(ask(Ask), Options, Options1), 1709 select_option(template(Template), Options1, Options2), 1710 !, 1711 no_numbered_var_in(Ask+Template), 1712 findall(AskString-TemplateString, 1713 ask_template_to_strings(Ask, Template, AskString, TemplateString), 1714 [ AskString-TemplateString ]), 1715 options_to_dict(Options2, Dict0), 1716 Dict = Dict0.put(_{ask:AskString,template:TemplateString}). 1717options_to_dict(Options, Dict) :- 1718 maplist(prolog_option, Options, Options1), 1719 dict_create(Dict, _, Options1). 1720 1721no_numbered_var_in(Term) :- 1722 sub_term(Sub, Term), 1723 subsumes_term('$VAR'(_), Sub), 1724 !, 1725 domain_error(numbered_vars_free_term, Term). 1726no_numbered_var_in(_). 1727 1728ask_template_to_strings(Ask, Template, AskString, TemplateString) :- 1729 numbervars(Ask+Template, 0, _), 1730 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ], 1731 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts, 1732 Template, WOpts 1733 ]), 1734 split_string(AskTemplate, "\n", "", [AskString, TemplateString]). 1735 1736prolog_option(Option0, Option) :- 1737 create_option_type(Option0, term), 1738 !, 1739 Option0 =.. [Name,Value], 1740 format(string(String), '~k', [Value]), 1741 Option =.. [Name,String]. 1742prolog_option(Option, Option). 1743 1744create_option_type(ask(_), term). 1745create_option_type(template(_), term). 1746create_option_type(application(_), atom). 1747 1748remote_pengine_send(BaseURL, ID, Event, Options) :- 1749 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options), 1750 thread_self(Queue), 1751 pengine_reply(Queue, Reply). 1752 1753remote_pengine_pull_response(BaseURL, ID, Options) :- 1754 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options), 1755 thread_self(Queue), 1756 pengine_reply(Queue, Reply). 1757 1758remote_pengine_abort(BaseURL, ID, Options) :- 1759 remote_send_rec(BaseURL, abort, ID, [], Reply, Options), 1760 thread_self(Queue), 1761 pengine_reply(Queue, Reply).
1768remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :- 1769 !, 1770 server_url(Server, Action, [id=ID], URL), 1771 http_open(URL, Stream, % putting this in setup_call_cleanup/3 1772 [ post(prolog(Event)) % makes it impossible to interrupt. 1773 | Options 1774 ]), 1775 call_cleanup( 1776 read_prolog_reply(Stream, Reply), 1777 close(Stream)). 1778remote_send_rec(Server, Action, ID, Params, Reply, Options) :- 1779 server_url(Server, Action, [id=ID|Params], URL), 1780 http_open(URL, Stream, Options), 1781 call_cleanup( 1782 read_prolog_reply(Stream, Reply), 1783 close(Stream)). 1784 1785remote_post_rec(Server, Action, Data, Reply, Options) :- 1786 server_url(Server, Action, [], URL), 1787 probe(Action, URL), 1788 http_open(URL, Stream, 1789 [ post(json(Data)) 1790 | Options 1791 ]), 1792 call_cleanup( 1793 read_prolog_reply(Stream, Reply), 1794 close(Stream)).
1802probe(create, URL) :- 1803 !, 1804 http_open(URL, Stream, [method(options)]), 1805 close(Stream). 1806probe(_, _). 1807 1808read_prolog_reply(In, Reply) :- 1809 set_stream(In, encoding(utf8)), 1810 read(In, Reply0), 1811 rebind_cycles(Reply0, Reply). 1812 1813rebind_cycles(@(Reply, Bindings), Reply) :- 1814 is_list(Bindings), 1815 !, 1816 maplist(bind, Bindings). 1817rebind_cycles(Reply, Reply). 1818 1819bind(Var = Value) :- 1820 Var = Value. 1821 1822server_url(Server, Action, Params, URL) :- 1823 atom_concat('pengine/', Action, PAction), 1824 uri_edit([ path(PAction), 1825 search(Params) 1826 ], Server, URL).
Valid options are:
timeout
.1847pengine_event(Event) :- 1848 pengine_event(Event, []). 1849 1850pengine_event(Event, Options) :- 1851 thread_self(Self), 1852 option(listen(Id), Options, _), 1853 ( thread_get_message(Self, pengine_event(Id, Event), Options) 1854 -> true 1855 ; Event = timeout 1856 ), 1857 update_remote_destroy(Event). 1858 1859update_remote_destroy(Event) :- 1860 destroy_event(Event), 1861 arg(1, Event, Id), 1862 pengine_remote(Id, _Server), 1863 !, 1864 pengine_unregister_remote(Id). 1865update_remote_destroy(_). 1866 1867destroy_event(destroy(_)). 1868destroy_event(destroy(_,_)). 1869destroy_event(create(_,Features)) :- 1870 memberchk(answer(Answer), Features), 1871 !, 1872 nonvar(Answer), 1873 destroy_event(Answer).
ignore(call(Closure, E))
. A
closure thus acts as a handler for the event. Some events are also
treated specially:
Valid options are:
all
,
all_but_sender
or a Prolog list of NameOrIDs. [not yet
implemented]*/
1902pengine_event_loop(Closure, Options) :- 1903 child(_,_), 1904 !, 1905 pengine_event(Event), 1906 ( option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs 1907 -> forall(child(_,ID), pengine_send(ID, Event)) 1908 ; true 1909 ), 1910 pengine_event_loop(Event, Closure, Options). 1911pengine_event_loop(_, _). 1912 1913:- meta_predicate 1914 pengine_process_event( , , , ). 1915 1916pengine_event_loop(Event, Closure, Options) :- 1917 pengine_process_event(Event, Closure, Continue, Options), 1918 ( Continue == true 1919 -> pengine_event_loop(Closure, Options) 1920 ; true 1921 ). 1922 1923pengine_process_event(create(ID, T), Closure, Continue, Options) :- 1924 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]), 1925 ( select(answer(First), T, T1) 1926 -> ignore(call(Closure, create(ID, T1))), 1927 pengine_process_event(First, Closure, Continue, Options) 1928 ; ignore(call(Closure, create(ID, T))), 1929 Continue = true 1930 ). 1931pengine_process_event(output(ID, Msg), Closure, true, _Options) :- 1932 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]), 1933 ignore(call(Closure, output(ID, Msg))), 1934 pengine_pull_response(ID, []). 1935pengine_process_event(debug(ID, Msg), Closure, true, _Options) :- 1936 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]), 1937 ignore(call(Closure, debug(ID, Msg))), 1938 pengine_pull_response(ID, []). 1939pengine_process_event(prompt(ID, Term), Closure, true, _Options) :- 1940 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]), 1941 ignore(call(Closure, prompt(ID, Term))). 1942pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :- 1943 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]), 1944 ignore(call(Closure, success(ID, Sol, More))). 1945pengine_process_event(failure(ID, _Time), Closure, true, _Options) :- 1946 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]), 1947 ignore(call(Closure, failure(ID))). 1948pengine_process_event(error(ID, Error), Closure, Continue, _Options) :- 1949 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]), 1950 ( call(Closure, error(ID, Error)) 1951 -> Continue = true 1952 ; forall(child(_,Child), pengine_destroy(Child)), 1953 throw(Error) 1954 ). 1955pengine_process_event(stop(ID), Closure, true, _Options) :- 1956 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]), 1957 ignore(call(Closure, stop(ID))). 1958pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :- 1959 pengine_process_event(Event, Closure, _, Options), 1960 pengine_process_event(destroy(ID), Closure, Continue, Options). 1961pengine_process_event(destroy(ID), Closure, true, _Options) :- 1962 retractall(child(_,ID)), 1963 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]), 1964 ignore(call(Closure, destroy(ID))).
copy_term_nat(Query, Copy), % attributes are not copied to the server call(Copy), % executed on server at URL Query = Copy.
Valid options are:
pengines:time_limit
.Remaining options (except the server option) are passed to pengine_create/1. */
1993pengine_rpc(URL, Query) :- 1994 pengine_rpc(URL, Query, []). 1995 1996pengine_rpc(URL, Query, M:Options0) :- 1997 translate_local_sources(Options0, Options1, M), 1998 ( option(timeout(_), Options1) 1999 -> Options = Options1 2000 ; setting(time_limit, Limit), 2001 Options = [timeout(Limit)|Options1] 2002 ), 2003 term_variables(Query, Vars), 2004 Template =.. [v|Vars], 2005 State = destroy(true), % modified by process_event/4 2006 setup_call_catcher_cleanup( 2007 pengine_create([ ask(Query), 2008 template(Template), 2009 server(URL), 2010 id(Id) 2011 | Options 2012 ]), 2013 wait_event(Template, State, [listen(Id)|Options]), 2014 Why, 2015 pengine_destroy_and_wait(State, Id, Why)). 2016 2017pengine_destroy_and_wait(destroy(true), Id, Why) :- 2018 !, 2019 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]), 2020 pengine_destroy(Id), 2021 wait_destroy(Id, 10). 2022pengine_destroy_and_wait(_, _, Why) :- 2023 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]). 2024 2025wait_destroy(Id, _) :- 2026 \+ child(_, Id), 2027 !. 2028wait_destroy(Id, N) :- 2029 pengine_event(Event, [listen(Id),timeout(10)]), 2030 !, 2031 ( destroy_event(Event) 2032 -> retractall(child(_,Id)) 2033 ; succ(N1, N) 2034 -> wait_destroy(Id, N1) 2035 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]), 2036 pengine_unregister_remote(Id), 2037 retractall(child(_,Id)) 2038 ). 2039 2040wait_event(Template, State, Options) :- 2041 pengine_event(Event, Options), 2042 debug(pengine(event), 'Received ~p', [Event]), 2043 process_event(Event, Template, State, Options). 2044 2045process_event(create(_ID, Features), Template, State, Options) :- 2046 memberchk(answer(First), Features), 2047 process_event(First, Template, State, Options). 2048process_event(error(_ID, Error), _Template, _, _Options) :- 2049 throw(Error). 2050process_event(failure(_ID, _Time), _Template, _, _Options) :- 2051 fail. 2052process_event(prompt(ID, Prompt), Template, State, Options) :- 2053 pengine_rpc_prompt(ID, Prompt, Reply), 2054 pengine_send(ID, input(Reply)), 2055 wait_event(Template, State, Options). 2056process_event(output(ID, Term), Template, State, Options) :- 2057 pengine_rpc_output(ID, Term), 2058 pengine_pull_response(ID, Options), 2059 wait_event(Template, State, Options). 2060process_event(debug(ID, Message), Template, State, Options) :- 2061 debug(pengine(debug), '~w', [Message]), 2062 pengine_pull_response(ID, Options), 2063 wait_event(Template, State, Options). 2064process_event(success(_ID, Solutions, _Proj, _Time, false), 2065 Template, _, _Options) :- 2066 !, 2067 member(Template, Solutions). 2068process_event(success(ID, Solutions, _Proj, _Time, true), 2069 Template, State, Options) :- 2070 ( member(Template, Solutions) 2071 ; pengine_next(ID, Options), 2072 wait_event(Template, State, Options) 2073 ). 2074process_event(destroy(ID, Event), Template, State, Options) :- 2075 !, 2076 retractall(child(_,ID)), 2077 nb_setarg(1, State, false), 2078 debug(pengine(destroy), 'State: ~p~n', [State]), 2079 process_event(Event, Template, State, Options). 2080% compatibility with older versions of the protocol. 2081process_event(success(ID, Solutions, Time, More), 2082 Template, State, Options) :- 2083 process_event(success(ID, Solutions, _Proj, Time, More), 2084 Template, State, Options). 2085 2086 2087pengine_rpc_prompt(ID, Prompt, Term) :- 2088 prompt(ID, Prompt, Term0), 2089 !, 2090 Term = Term0. 2091pengine_rpc_prompt(_ID, Prompt, Term) :- 2092 setup_call_cleanup( 2093 prompt(Old, Prompt), 2094 read(Term), 2095 prompt(_, Old)). 2096 2097pengine_rpc_output(ID, Term) :- 2098 output(ID, Term), 2099 !. 2100pengine_rpc_output(_ID, Term) :- 2101 print(Term).
2108:- multifile prompt/3.
2115:- multifile output/2. 2116 2117 2118/*================= HTTP handlers ======================= 2119*/ 2120 2121% Declare HTTP locations we serve and how. Note that we use 2122% time_limit(inifinite) because pengines have their own timeout. Also 2123% note that we use spawn. This is needed because we can easily get 2124% many clients waiting for some action on a pengine to complete. 2125% Without spawning, we would quickly exhaust the worker pool of the 2126% HTTP server. 2127% 2128% FIXME: probably we should wait for a short time for the pengine on 2129% the default worker thread. Only if that time has expired, we can 2130% call http_spawn/2 to continue waiting on a new thread. That would 2131% improve the performance and reduce the usage of threads. 2132 2133:- http_handler(root(pengine), http_404([]), 2134 [ id(pengines) ]). 2135:- http_handler(root(pengine/create), http_pengine_create, 2136 [ time_limit(infinite), spawn([]) ]). 2137:- http_handler(root(pengine/send), http_pengine_send, 2138 [ time_limit(infinite), spawn([]) ]). 2139:- http_handler(root(pengine/pull_response), http_pengine_pull_response, 2140 [ time_limit(infinite), spawn([]) ]). 2141:- http_handler(root(pengine/abort), http_pengine_abort, []). 2142:- http_handler(root(pengine/detach), http_pengine_detach, []). 2143:- http_handler(root(pengine/list), http_pengine_list, []). 2144:- http_handler(root(pengine/ping), http_pengine_ping, []). 2145:- http_handler(root(pengine/destroy_all), http_pengine_destroy_all, []). 2146 2147:- http_handler(root(pengine/'pengines.js'), 2148 http_reply_file(library('http/web/js/pengines.js'), []), []). 2149:- http_handler(root(pengine/'plterm.css'), 2150 http_reply_file(library('http/web/css/plterm.css'), []), []).
application/json
and as
www-form-encoded
. Accepted parameters:
Parameter | Default | Comment |
---|---|---|
format | prolog | Output format |
application | pengine_sandbox | Pengine application |
chunk | 1 | Chunk-size for results |
solutions | chunked | If all , emit all results |
ask | - | The query |
template | - | Output template |
src_text | "" | Program |
src_url | - | Program to download |
disposition | - | Download location |
Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.
Using chunk=false
simulates the recursive toplevel. See
pengine_ask/3.
2180http_pengine_create(Request) :- 2181 reply_options(Request, [post]), 2182 !. 2183http_pengine_create(Request) :- 2184 memberchk(content_type(CT), Request), 2185 sub_atom(CT, 0, _, _, 'application/json'), 2186 !, 2187 http_read_json_dict(Request, Dict), 2188 dict_atom_option(format, Dict, Format, prolog), 2189 dict_atom_option(application, Dict, Application, pengine_sandbox), 2190 http_pengine_create(Request, Application, Format, Dict). 2191http_pengine_create(Request) :- 2192 Optional = [optional(true)], 2193 OptString = [string|Optional], 2194 Form = [ format(Format, [default(prolog)]), 2195 application(Application, [default(pengine_sandbox)]), 2196 chunk(_, [nonneg;oneof([false]), default(1)]), 2197 solutions(_, [oneof([all,chunked]), default(chunked)]), 2198 ask(_, OptString), 2199 template(_, OptString), 2200 src_text(_, OptString), 2201 disposition(_, OptString), 2202 src_url(_, Optional) 2203 ], 2204 http_parameters(Request, Form), 2205 form_dict(Form, Dict), 2206 http_pengine_create(Request, Application, Format, Dict). 2207 2208dict_atom_option(Key, Dict, Atom, Default) :- 2209 ( get_dict(Key, Dict, String) 2210 -> atom_string(Atom, String) 2211 ; Atom = Default 2212 ). 2213 2214form_dict(Form, Dict) :- 2215 form_values(Form, Pairs), 2216 dict_pairs(Dict, _, Pairs). 2217 2218form_values([], []). 2219form_values([H|T], Pairs) :- 2220 arg(1, H, Value), 2221 nonvar(Value), 2222 !, 2223 functor(H, Name, _), 2224 Pairs = [Name-Value|PairsT], 2225 form_values(T, PairsT). 2226form_values([_|T], Pairs) :- 2227 form_values(T, Pairs).
2232http_pengine_create(Request, Application, Format, Dict) :- 2233 current_application(Application), 2234 !, 2235 allowed(Request, Application), 2236 authenticate(Request, Application, UserOptions), 2237 dict_to_options(Dict, Application, CreateOptions0), 2238 append(UserOptions, CreateOptions0, CreateOptions), 2239 pengine_uuid(Pengine), 2240 message_queue_create(Queue, [max_size(25)]), 2241 setting(Application:time_limit, TimeLimit), 2242 get_time(Now), 2243 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)), 2244 broadcast(pengine(create(Pengine, Application, CreateOptions))), 2245 create(Queue, Pengine, CreateOptions, http, Application), 2246 create_wait_and_output_result(Pengine, Queue, Format, 2247 TimeLimit, Dict), 2248 gc_abandoned_queues. 2249http_pengine_create(_Request, Application, Format, _Dict) :- 2250 Error = existence_error(pengine_application, Application), 2251 pengine_uuid(ID), 2252 output_result(Format, error(ID, error(Error, _))). 2253 2254 2255dict_to_options(Dict, Application, CreateOptions) :- 2256 dict_pairs(Dict, _, Pairs), 2257 pairs_create_options(Pairs, Application, CreateOptions). 2258 2259pairs_create_options([], _, []) :- !. 2260pairs_create_options([N-V0|T0], App, [Opt|T]) :- 2261 Opt =.. [N,V], 2262 pengine_create_option(Opt), N \== user, 2263 !, 2264 ( create_option_type(Opt, atom) 2265 -> atom_string(V, V0) % term creation must be done if 2266 ; V = V0 % we created the source and know 2267 ), % the operators. 2268 pairs_create_options(T0, App, T). 2269pairs_create_options([_|T0], App, T) :- 2270 pairs_create_options(T0, App, T).
time_limit
,
Pengine is aborted and the result is error(time_limit_exceeded,
_)
.
2281wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
2282 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2283 [ timeout(TimeLimit)
2284 ]),
2285 Error, true)
2286 -> ( var(Error)
2287 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2288 ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2289 protect_pengine(Pengine, output_result(Format, Event))
2290 ; output_result(Format, died(Pengine))
2291 )
2292 ; time_limit_exceeded(Pengine, Format)
2293 ).
disposition
key to denote the
download location.2302create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2303 get_dict(solutions, Dict, all), 2304 !, 2305 between(1, infinite, Page), 2306 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2307 [ timeout(TimeLimit) 2308 ]), 2309 Error, true) 2310 -> ( var(Error) 2311 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]), 2312 ( destroy_queue_from_http(Pengine, Event, Queue) 2313 -> !, 2314 protect_pengine(Pengine, 2315 output_result(Format, page(Page, Event), Dict)) 2316 ; is_more_event(Event) 2317 -> pengine_thread(Pengine, Thread), 2318 thread_send_message(Thread, pengine_request(next)), 2319 protect_pengine(Pengine, 2320 output_result(Format, page(Page, Event), Dict)), 2321 fail 2322 ; !, 2323 protect_pengine(Pengine, 2324 output_result(Format, page(Page, Event), Dict)) 2325 ) 2326 ; !, output_result(Format, died(Pengine)) 2327 ) 2328 ; !, time_limit_exceeded(Pengine, Format) 2329 ), 2330 !. 2331create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :- 2332 wait_and_output_result(Pengine, Queue, Format, TimeLimit). 2333 2334is_more_event(success(_Id, _Answers, _Projection, _Time, true)). 2335is_more_event(create(_, Options)) :- 2336 memberchk(answer(Event), Options), 2337 is_more_event(Event).
2351time_limit_exceeded(Pengine, Format) :-
2352 call_cleanup(
2353 pengine_destroy(Pengine, [force(true)]),
2354 output_result(Format,
2355 destroy(Pengine,
2356 error(Pengine, time_limit_exceeded)))).
2371destroy_queue_from_http(ID, _, Queue) :- 2372 output_queue(ID, Queue, _), 2373 !, 2374 destroy_queue_if_empty(Queue). 2375destroy_queue_from_http(ID, Event, Queue) :- 2376 debug(pengine(destroy), 'DESTROY? ~p', [Event]), 2377 is_destroy_event(Event), 2378 !, 2379 message_queue_property(Queue, size(Waiting)), 2380 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]), 2381 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)). 2382 2383is_destroy_event(destroy(_)). 2384is_destroy_event(destroy(_,_)). 2385is_destroy_event(create(_, Options)) :- 2386 memberchk(answer(Event), Options), 2387 is_destroy_event(Event). 2388 2389destroy_queue_if_empty(Queue) :- 2390 thread_peek_message(Queue, _), 2391 !. 2392destroy_queue_if_empty(Queue) :- 2393 retractall(output_queue(_, Queue, _)), 2394 message_queue_destroy(Queue).
2402:- dynamic 2403 last_gc/1. 2404 2405gc_abandoned_queues :- 2406 consider_queue_gc, 2407 !, 2408 get_time(Now), 2409 ( output_queue(_, Queue, Time), 2410 Now-Time > 15*60, 2411 retract(output_queue(_, Queue, Time)), 2412 message_queue_destroy(Queue), 2413 fail 2414 ; retractall(last_gc(_)), 2415 asserta(last_gc(Now)) 2416 ). 2417gc_abandoned_queues. 2418 2419consider_queue_gc :- 2420 predicate_property(output_queue(_,_,_), number_of_clauses(N)), 2421 N > 100, 2422 ( last_gc(Time), 2423 get_time(Now), 2424 Now-Time > 5*60 2425 -> true 2426 ; \+ last_gc(_) 2427 ).
2445:- dynamic output_queue_destroyed/1. 2446 2447sync_destroy_queue_from_http(ID, Queue) :- 2448 ( output_queue(ID, Queue, _) 2449 -> destroy_queue_if_empty(Queue) 2450 ; thread_peek_message(Queue, pengine_event(_, output(_,_))) 2451 -> debug(pengine(destroy), 'Delay destruction of ~p because of output', 2452 [Queue]), 2453 get_time(Now), 2454 asserta(output_queue(ID, Queue, Now)) 2455 ; message_queue_destroy(Queue), 2456 asserta(output_queue_destroyed(Queue)) 2457 ).
pengine
held.2464sync_destroy_queue_from_pengine(ID, Queue) :- 2465 ( retract(output_queue_destroyed(Queue)) 2466 -> true 2467 ; get_time(Now), 2468 asserta(output_queue(ID, Queue, Now)) 2469 ), 2470 retractall(pengine_queue(ID, Queue, _, _)). 2471 2472 2473http_pengine_send(Request) :- 2474 reply_options(Request, [get,post]), 2475 !. 2476http_pengine_send(Request) :- 2477 http_parameters(Request, 2478 [ id(ID, [ type(atom) ]), 2479 event(EventString, [optional(true)]), 2480 format(Format, [default(prolog)]) 2481 ]), 2482 catch(read_event(ID, Request, Format, EventString, Event), 2483 Error, 2484 true), 2485 ( var(Error) 2486 -> debug(pengine(event), 'HTTP send: ~p', [Event]), 2487 ( pengine_thread(ID, Thread) 2488 -> pengine_queue(ID, Queue, TimeLimit, _), 2489 random_delay, 2490 broadcast(pengine(send(ID, Event))), 2491 thread_send_message(Thread, pengine_request(Event)), 2492 wait_and_output_result(ID, Queue, Format, TimeLimit) 2493 ; atom(ID) 2494 -> pengine_died(Format, ID) 2495 ; http_404([], Request) 2496 ) 2497 ; Error = error(existence_error(pengine, ID), _) 2498 -> pengine_died(Format, ID) 2499 ; output_result(Format, error(ID, Error)) 2500 ). 2501 2502pengine_died(Format, Pengine) :- 2503 output_result(Format, error(Pengine, 2504 error(existence_error(pengine, Pengine),_))).
pengine_done
mutex.
2515read_event(Pengine, Request, Format, EventString, Event) :- 2516 protect_pengine( 2517 Pengine, 2518 ( get_pengine_module(Pengine, Module), 2519 read_event_2(Request, EventString, Module, Event0, Bindings) 2520 )), 2521 !, 2522 fix_bindings(Format, Event0, Bindings, Event). 2523read_event(Pengine, Request, _Format, _EventString, _Event) :- 2524 debug(pengine(event), 'Pengine ~q vanished', [Pengine]), 2525 discard_post_data(Request), 2526 existence_error(pengine, Pengine).
event
parameter or as a posted document.2534read_event_2(_Request, EventString, Module, Event, Bindings) :- 2535 nonvar(EventString), 2536 !, 2537 term_string(Event, EventString, 2538 [ variable_names(Bindings), 2539 module(Module) 2540 ]). 2541read_event_2(Request, _EventString, Module, Event, Bindings) :- 2542 option(method(post), Request), 2543 http_read_data(Request, Event, 2544 [ content_type('application/x-prolog'), 2545 module(Module), 2546 variable_names(Bindings) 2547 ]).
2553discard_post_data(Request) :- 2554 option(method(post), Request), 2555 !, 2556 setup_call_cleanup( 2557 open_null_stream(NULL), 2558 http_read_data(Request, _, [to(stream(NULL))]), 2559 close(NULL)). 2560discard_post_data(_).
json(-s)
Format from the variables in
the asked Goal. Variables starting with an underscore, followed
by an capital letter are ignored from the template.2568fix_bindings(Format, 2569 ask(Goal, Options0), Bindings, 2570 ask(Goal, NewOptions)) :- 2571 json_lang(Format), 2572 !, 2573 exclude(anon, Bindings, NamedBindings), 2574 template(NamedBindings, Template, Options0, Options1), 2575 select_option(chunk(Paging), Options1, Options2, 1), 2576 NewOptions = [ template(Template), 2577 chunk(Paging), 2578 bindings(NamedBindings) 2579 | Options2 2580 ]. 2581fix_bindings(_, Command, _, Command). 2582 2583template(_, Template, Options0, Options) :- 2584 select_option(template(Template), Options0, Options), 2585 !. 2586template(Bindings, Template, Options, Options) :- 2587 dict_create(Template, swish_default_template, Bindings). 2588 2589anon(Name=_) :- 2590 sub_atom(Name, 0, _, _, '_'), 2591 sub_atom(Name, 1, 1, _, Next), 2592 char_type(Next, prolog_var_start). 2593 2594var_name(Name=_, Name).
2601json_lang(json) :- !. 2602json_lang(Format) :- 2603 sub_atom(Format, 0, _, _, 'json-').
2610http_pengine_pull_response(Request) :- 2611 reply_options(Request, [get]), 2612 !. 2613http_pengine_pull_response(Request) :- 2614 http_parameters(Request, 2615 [ id(ID, []), 2616 format(Format, [default(prolog)]) 2617 ]), 2618 reattach(ID), 2619 ( ( pengine_queue(ID, Queue, TimeLimit, _) 2620 -> true 2621 ; output_queue(ID, Queue, _), 2622 TimeLimit = 0 2623 ) 2624 -> wait_and_output_result(ID, Queue, Format, TimeLimit) 2625 ; http_404([], Request) 2626 ).
2635http_pengine_abort(Request) :- 2636 reply_options(Request, [get,post]), 2637 !. 2638http_pengine_abort(Request) :- 2639 http_parameters(Request, 2640 [ id(ID, []) 2641 ]), 2642 ( pengine_thread(ID, _Thread) 2643 -> broadcast(pengine(abort(ID))), 2644 abort_pending_output(ID), 2645 pengine_abort(ID), 2646 reply_json(true) 2647 ; http_404([], Request) 2648 ).
2660http_pengine_detach(Request) :- 2661 reply_options(Request, [post]), 2662 !. 2663http_pengine_detach(Request) :- 2664 http_parameters(Request, 2665 [ id(ID, []) 2666 ]), 2667 http_read_json_dict(Request, ClientData), 2668 ( pengine_property(ID, application(Application)), 2669 allowed(Request, Application), 2670 authenticate(Request, Application, _UserOptions) 2671 -> broadcast(pengine(detach(ID))), 2672 get_time(Now), 2673 assertz(pengine_detached(ID, ClientData.put(time, Now))), 2674 pengine_queue(ID, Queue, _TimeLimit, _Now), 2675 message_queue_set(Queue, max_size(1000)), 2676 pengine_reply(Queue, detached(ID)), 2677 reply_json(true) 2678 ; http_404([], Request) 2679 ). 2680 2681:- if(\+current_predicate(message_queue_set/2)). 2682message_queue_set(_,_). 2683:- endif. 2684 2685reattach(ID) :- 2686 ( retract(pengine_detached(ID, _Data)), 2687 pengine_queue(ID, Queue, _TimeLimit, _Now) 2688 -> message_queue_set(Queue, max_size(25)) 2689 ; true 2690 ).
2698http_pengine_destroy_all(Request) :- 2699 reply_options(Request, [get,post]), 2700 !. 2701http_pengine_destroy_all(Request) :- 2702 http_parameters(Request, 2703 [ ids(IDsAtom, []) 2704 ]), 2705 atomic_list_concat(IDs, ',', IDsAtom), 2706 forall(( member(ID, IDs), 2707 \+ pengine_detached(ID, _) 2708 ), 2709 pengine_destroy(ID, [force(true)])), 2710 reply_json("ok").
status(Pengine, Stats)
is created, where Stats
is the return of thread_statistics/2.2718http_pengine_ping(Request) :- 2719 reply_options(Request, [get]), 2720 !. 2721http_pengine_ping(Request) :- 2722 http_parameters(Request, 2723 [ id(Pengine, []), 2724 format(Format, [default(prolog)]) 2725 ]), 2726 ( pengine_thread(Pengine, Thread), 2727 Error = error(_,_), 2728 catch(thread_statistics(Thread, Stats), Error, fail) 2729 -> output_result(Format, ping(Pengine, Stats)) 2730 ; output_result(Format, died(Pengine)) 2731 ).
2740http_pengine_list(Request) :- 2741 reply_options(Request, [get]), 2742 !. 2743http_pengine_list(Request) :- 2744 http_parameters(Request, 2745 [ status(Status, [default(detached), oneof([detached])]), 2746 application(Application, [default(pengine_sandbox)]) 2747 ]), 2748 allowed(Request, Application), 2749 authenticate(Request, Application, _UserOptions), 2750 findall(Term, listed_pengine(Application, Status, Term), Terms), 2751 reply_json(json{pengines: Terms}). 2752 2753listed_pengine(Application, detached, State) :- 2754 State = pengine{id:Id, 2755 detached:Time, 2756 queued:Queued, 2757 stats:Stats}, 2758 2759 pengine_property(Id, application(Application)), 2760 pengine_property(Id, detached(Time)), 2761 pengine_queue(Id, Queue, _TimeLimit, _Now), 2762 message_queue_property(Queue, size(Queued)), 2763 ( pengine_thread(Id, Thread), 2764 catch(thread_statistics(Thread, Stats), _, fail) 2765 -> true 2766 ; Stats = thread{status:died} 2767 ).
prolog
, json
or json-s
.2776:- dynamic 2777 pengine_replying/2. % +Pengine, +Thread 2778 2779output_result(Format, Event) :- 2780 arg(1, Event, Pengine), 2781 thread_self(Thread), 2782 cors_enable, % contingent on http:cors setting 2783 disable_client_cache, 2784 setup_call_cleanup( 2785 asserta(pengine_replying(Pengine, Thread), Ref), 2786 catch(output_result(Format, Event, _{}), 2787 pengine_abort_output, 2788 true), 2789 erase(Ref)). 2790 2791output_result(Lang, Event, Dict) :- 2792 write_result(Lang, Event, Dict), 2793 !. 2794output_result(prolog, Event, _) :- 2795 !, 2796 format('Content-type: text/x-prolog; charset=UTF-8~n~n'), 2797 write_term(Event, 2798 [ quoted(true), 2799 ignore_ops(true), 2800 fullstop(true), 2801 blobs(portray), 2802 portray_goal(portray_blob), 2803 nl(true) 2804 ]). 2805output_result(Lang, Event, _) :- 2806 json_lang(Lang), 2807 !, 2808 ( event_term_to_json_data(Event, JSON, Lang) 2809 -> reply_json(JSON) 2810 ; assertion(event_term_to_json_data(Event, _, Lang)) 2811 ). 2812output_result(Lang, _Event, _) :- % FIXME: allow for non-JSON format 2813 domain_error(pengine_format, Lang).
'$BLOB'(Type)
.
Future versions may include more info, depending on Type.2823:- public portray_blob/2. % called from write-term 2824portray_blob(Blob, _Options) :- 2825 blob(Blob, Type), 2826 writeq('$BLOB'(Type)).
2833abort_pending_output(Pengine) :- 2834 forall(pengine_replying(Pengine, Thread), 2835 abort_output_thread(Thread)). 2836 2837abort_output_thread(Thread) :- 2838 catch(thread_signal(Thread, throw(pengine_abort_output)), 2839 error(existence_error(thread, _), _), 2840 true).
prolog
and various JSON dialects. The hook
event_to_json/3 can be used to refine the JSON dialects. This
hook must be used if a completely different output format is
desired.2856disable_client_cache :- 2857 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c 2858 Pragma: no-cache\r\n\c 2859 Expires: 0\r\n'). 2860 2861event_term_to_json_data(Event, JSON, Lang) :- 2862 event_to_json(Event, JSON, Lang), 2863 !. 2864event_term_to_json_data(success(ID, Bindings0, Projection, Time, More), 2865 json{event:success, id:ID, time:Time, 2866 data:Bindings, more:More, projection:Projection}, 2867 json) :- 2868 !, 2869 term_to_json(Bindings0, Bindings). 2870event_term_to_json_data(destroy(ID, Event), 2871 json{event:destroy, id:ID, data:JSON}, 2872 Style) :- 2873 !, 2874 event_term_to_json_data(Event, JSON, Style). 2875event_term_to_json_data(create(ID, Features0), JSON, Style) :- 2876 !, 2877 ( select(answer(First0), Features0, Features1) 2878 -> event_term_to_json_data(First0, First, Style), 2879 Features = [answer(First)|Features1] 2880 ; Features = Features0 2881 ), 2882 dict_create(JSON, json, [event(create), id(ID)|Features]). 2883event_term_to_json_data(destroy(ID, Event), 2884 json{event:destroy, id:ID, data:JSON}, Style) :- 2885 !, 2886 event_term_to_json_data(Event, JSON, Style). 2887event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :- 2888 !, 2889 Error0 = json{event:error, id:ID, data:Message}, 2890 add_error_details(ErrorTerm, Error0, Error), 2891 message_to_string(ErrorTerm, Message). 2892event_term_to_json_data(failure(ID, Time), 2893 json{event:failure, id:ID, time:Time}, _) :- 2894 !. 2895event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :- 2896 functor(EventTerm, F, 1), 2897 !, 2898 arg(1, EventTerm, ID). 2899event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :- 2900 functor(EventTerm, F, 2), 2901 arg(1, EventTerm, ID), 2902 arg(2, EventTerm, Data), 2903 term_to_json(Data, JSON). 2904 2905:- public add_error_details/3.
pengines_io.pl
.
2912add_error_details(Error, JSON0, JSON) :-
2913 add_error_code(Error, JSON0, JSON1),
2914 add_error_location(Error, JSON1, JSON).
code
field to JSON0 of Error is an ISO error term. The error
code is the functor name of the formal part of the error, e.g.,
syntax_error
, type_error
, etc. Some errors carry more
information:
2927add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :- 2928 atom(Type), 2929 !, 2930 to_atomic(Obj, Value), 2931 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}). 2932add_error_code(error(Formal, _), Error0, Error) :- 2933 callable(Formal), 2934 !, 2935 functor(Formal, Code, _), 2936 Error = Error0.put(code, Code). 2937add_error_code(_, Error, Error). 2938 2939% What to do with large integers? 2940to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj. 2941to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj. 2942to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj. 2943to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
location
property if the error can be associated with a
source location. The location is an object with properties file
and line
and, if available, the character location in the line.2952add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :- 2953 atom(Path), integer(Line), 2954 !, 2955 Term = Term0.put(_{location:_{file:Path, line:Line}}). 2956add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :- 2957 atom(Path), integer(Line), integer(Ch), 2958 !, 2959 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}). 2960add_error_location(_, Term, Term).
success(ID, Bindings, Projection, Time, More)
and output(ID,
Term)
into a format suitable for processing at the client side.2971%:- multifile pengines:event_to_json/3. 2972 2973 2974 /******************************* 2975 * ACCESS CONTROL * 2976 *******************************/
forbidden
header if contact is not allowed.2983allowed(Request, Application) :- 2984 setting(Application:allow_from, Allow), 2985 match_peer(Request, Allow), 2986 setting(Application:deny_from, Deny), 2987 \+ match_peer(Request, Deny), 2988 !. 2989allowed(Request, _Application) :- 2990 memberchk(request_uri(Here), Request), 2991 throw(http_reply(forbidden(Here))). 2992 2993match_peer(_, Allowed) :- 2994 memberchk(*, Allowed), 2995 !. 2996match_peer(_, []) :- !, fail. 2997match_peer(Request, Allowed) :- 2998 http_peer(Request, Peer), 2999 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]), 3000 ( memberchk(Peer, Allowed) 3001 -> true 3002 ; member(Pattern, Allowed), 3003 match_peer_pattern(Pattern, Peer) 3004 ). 3005 3006match_peer_pattern(Pattern, Peer) :- 3007 ip_term(Pattern, IP), 3008 ip_term(Peer, IP), 3009 !. 3010 3011ip_term(Peer, Pattern) :- 3012 split_string(Peer, ".", "", PartStrings), 3013 ip_pattern(PartStrings, Pattern). 3014 3015ip_pattern([], []). 3016ip_pattern([*], _) :- !. 3017ip_pattern([S|T0], [N|T]) :- 3018 number_string(N, S), 3019 ip_pattern(T0, T).
[user(User)]
, []
or
an exception.3027authenticate(Request, Application, UserOptions) :- 3028 authentication_hook(Request, Application, User), 3029 !, 3030 must_be(ground, User), 3031 UserOptions = [user(User)]. 3032authenticate(_, _, []).
throw(http_reply(authorise(basic(Realm))))
Start a normal HTTP login challenge (reply 401)throw(http_reply(forbidden(Path)))
)
Reject the request using a 403 repply.3054pengine_register_user(Options) :- 3055 option(user(User), Options), 3056 !, 3057 pengine_self(Me), 3058 asserta(pengine_user(Me, User)). 3059pengine_register_user(_).
3070pengine_user(User) :-
3071 pengine_self(Me),
3072 pengine_user(Me, User).
3078reply_options(Request, Allowed) :- 3079 option(method(options), Request), 3080 !, 3081 cors_enable(Request, 3082 [ methods(Allowed) 3083 ]), 3084 format('Content-type: text/plain\r\n'), 3085 format('~n'). % empty body 3086 3087 3088 /******************************* 3089 * COMPILE SOURCE * 3090 *******************************/
3099pengine_src_text(Src, Module) :- 3100 pengine_self(Self), 3101 format(atom(ID), 'pengine://~w/src', [Self]), 3102 extra_load_options(Self, Options), 3103 setup_call_cleanup( 3104 open_chars_stream(Src, Stream), 3105 load_files(Module:ID, 3106 [ stream(Stream), 3107 module(Module), 3108 silent(true) 3109 | Options 3110 ]), 3111 close(Stream)), 3112 keep_source(Self, ID, Src). 3113 3114system'#file'(File, _Line) :- 3115 prolog_load_context(stream, Stream), 3116 set_stream(Stream, file_name(File)), 3117 set_stream(Stream, record_position(false)), 3118 set_stream(Stream, record_position(true)).
3128pengine_src_url(URL, Module) :- 3129 pengine_self(Self), 3130 uri_encoded(path, URL, Path), 3131 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]), 3132 extra_load_options(Self, Options), 3133 ( get_pengine_application(Self, Application), 3134 setting(Application:debug_info, false) 3135 -> setup_call_cleanup( 3136 http_open(URL, Stream, []), 3137 ( set_stream(Stream, encoding(utf8)), 3138 load_files(Module:ID, 3139 [ stream(Stream), 3140 module(Module) 3141 | Options 3142 ]) 3143 ), 3144 close(Stream)) 3145 ; setup_call_cleanup( 3146 http_open(URL, TempStream, []), 3147 ( set_stream(TempStream, encoding(utf8)), 3148 read_string(TempStream, _, Src) 3149 ), 3150 close(TempStream)), 3151 setup_call_cleanup( 3152 open_chars_stream(Src, Stream), 3153 load_files(Module:ID, 3154 [ stream(Stream), 3155 module(Module) 3156 | Options 3157 ]), 3158 close(Stream)), 3159 keep_source(Self, ID, Src) 3160 ). 3161 3162 3163extra_load_options(Pengine, Options) :- 3164 pengine_not_sandboxed(Pengine), 3165 !, 3166 Options = []. 3167extra_load_options(_, [sandboxed(true)]). 3168 3169 3170keep_source(Pengine, ID, SrcText) :- 3171 get_pengine_application(Pengine, Application), 3172 setting(Application:debug_info, true), 3173 !, 3174 to_string(SrcText, SrcString), 3175 assertz(pengine_data(Pengine, source(ID, SrcString))). 3176keep_source(_, _, _). 3177 3178to_string(String, String) :- 3179 string(String), 3180 !. 3181to_string(Atom, String) :- 3182 atom_string(Atom, String), 3183 !. 3184 3185 /******************************* 3186 * SANDBOX * 3187 *******************************/ 3188 3189:- multifile 3190 sandbox:safe_primitive/1. 3191 3192sandbox:safe_primitive(pengines:pengine_input(_, _)). 3193sandbox:safe_primitive(pengines:pengine_output(_)). 3194sandbox:safe_primitive(pengines:pengine_debug(_,_)). 3195 3196 3197 /******************************* 3198 * MESSAGES * 3199 *******************************/ 3200 3201prologerror_message(sandbox(time_limit_exceeded, Limit)) --> 3202 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl, 3203 'This is normally caused by an insufficiently instantiated'-[], nl, 3204 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl, 3205 'find all possible instantations of Var.'-[] 3206 ]
Pengines: Web Logic Programming Made Easy
The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.