1 % (c) 2024-2024 Lehrstuhl fuer Softwaretechnik und Programmiersprachen,
2 % Heinrich Heine Universitaet Duesseldorf
3 % This software is licenced under EPL 1.0 (http://www.eclipse.org/org/documents/epl-v10.html
4
5 :- module(zmq_rpc, [zmq_rpc_init/2,
6 stream_socket_rpc_init/2,
7 zmq_rpc_destroy/1,
8 zmq_rpc_destroy_all/0,
9 zmq_rpc_send/4,
10
11 stream_socket_rpc_accept_request/3,
12 stream_socket_rpc_send_answer/3,
13 close_stream_server_socket/1
14 ]).
15
16 :- use_module(probsrc(module_information)).
17 :- module_info(group, zmq).
18 :- module_info(description, 'RPC via ZMQ implementation (using JSON-RPC 2.0)').
19
20 % Using https://www.jsonrpc.org/specification
21 % We do not support notifications
22 % example JSON-RPC request sent for ZMQ_RPC_SEND(x,"hello",{"a"|->JsonNull}):
23 % {"jsonrpc":"2.0","method":"hello","params":{"a":null},"id":1}
24 % JsonValue = JsonNull, JsonBoolean(BOOL),
25 % JsonNumber(FLOAT), JsonString(STRING),
26 % JsonArray(seq(JsonValue)), JsonObject(STRING +-> JsonValue);
27 % RpcResult = RpcSuccess(JsonValue), RpcError(STRING)
28
29 :- use_module(probsrc(eventhandling), [register_event_listener/3]).
30 :- register_event_listener(reset_specification, zmq_rpc_destroy_all, 'ZMQ cleanup').
31
32 :- use_module(probsrc(pathes_lib), [safe_load_foreign_resource/2]).
33 :- use_module(probsrc(custom_explicit_sets), [expand_custom_set_to_list/2,convert_to_avl/2]).
34 :- use_module(probsrc(tools_strings), [ajoin/2]).
35 :- use_module(extrasrc(json_parser), [json_parse/2, json_write/2]).
36 :- use_module(extrasrc(json_freetype), [json_parser_to_json_freetype/2, json_freetype_to_json_parser/2]).
37 :- use_module(probsrc(error_manager),[add_error/3, add_warning/3, add_internal_error/2, add_message/3]).
38 :- use_module(probsrc(debug),[debug_println/2, debug_format/3]).
39 :- use_module(library(lists)).
40 :- use_module(library(codesio), [with_output_to_codes/2]).
41 :- use_module(library(sockets)).
42 :- use_module(probsrc(gensym),[gennum/1]).
43
44 foreign(zmq_rpc_init_impl, zmq_rpc_init_impl(+string, [-address])).
45 foreign(zmq_rpc_destroy_impl, zmq_rpc_destroy_impl(+address)).
46 foreign(zmq_rpc_send_str_impl, zmq_rpc_send_str_impl(+address, +codes)).
47 foreign(zmq_rpc_receive_str_impl, zmq_rpc_receive_str_impl(+address, [-codes])).
48 foreign_resource(zmq_rpc, [zmq_rpc_init_impl, zmq_rpc_destroy_impl, zmq_rpc_send_str_impl, zmq_rpc_receive_str_impl]).
49
50 :- dynamic loaded/0.
51
52 init_zmq_rpc :-
53 (loaded -> true
54 ; assertz(loaded),
55 safe_load_foreign_resource(zmq_rpc, zmq_rpc)
56 ).
57
58 :- dynamic open_socket/2.
59 % open_socket(InternalSockentNrName,SocketTerm)
60 % SocketTerm is either integer for ZMQ or sicstus_stream_socket(Stream)
61
62 % API to decide how to send/receive messages depending on socket type
63 rpc_send_codes_request(sicstus_stream_socket(Stream),Codes) :- !,
64 % send request as a single line; supposes Codes does not contain newline characters
65 %format(user_output,'>>> ~s~n',[Codes]),
66 format(Stream,'~s~n',[Codes]), flush_output(Stream).
67 rpc_send_codes_request(Socket, ReqCodes) :- % send request via ZMQ API
68 zmq_rpc_send_str_impl(Socket,ReqCodes).
69
70 rpc_receive_codes_result(sicstus_stream_socket(Stream),Codes) :- !, read_line(Stream,Codes).
71 rpc_receive_codes_result(Socket, ReqCodes) :- zmq_rpc_receive_str_impl(Socket,ReqCodes).
72
73 rpc_destroy_socket(sicstus_stream_socket(Stream)) :- !, close(Stream).
74 rpc_destroy_socket(Socket) :- zmq_rpc_destroy_impl(Socket).
75
76 rpc_init(sicstus_stream_socket(_)) :- !. % no init required for these kinds of sockets
77 rpc_init(_) :- init_zmq_rpc.
78
79 % ------------------------
80
81 % set-up a regular Stream socket using single-line ndjson messages
82 stream_socket_rpc_init(PortTerm,Out) :-
83 % TODO: also allow to pass nodename, with empty ('') a connection is made to local machine
84 catch(socket_client_open('':PortTerm, Stream, [type(text),encoding(utf8)]),Exc,
85 (add_error(zmq_rpc,'Cannot open socket as client:',Exc),fail)),
86 gennum(GN),
87 SocketNr is -1-GN, % we assume negative numbers not used by ZMQ,
88 % makes API easier to use/type for end-users than returning e.g. string(Name)
89 (open_socket(SocketNr,_) -> add_internal_error('Socket number clash:',SocketNr) ; true),
90 assertz(open_socket(SocketNr,sicstus_stream_socket(Stream))),
91 Out = int(SocketNr).
92
93
94 % ------------------------
95
96 % set-up a ZMQ socket
97 zmq_rpc_init(Endpoint, Out) :-
98 atom(Endpoint),
99 init_zmq_rpc,
100 zmq_rpc_init_impl(Endpoint, Res),
101 debug_println(9,zmq_rpc_init_impl(Endpoint, Res)),
102 integer(Res),
103 assertz(open_socket(Res,Res)),
104 Out = int(Res).
105
106 % ------------------------
107
108 % destroy a socket with internal number SocketNr, works with all kinds of sockets
109 zmq_rpc_destroy(Socket) :- \+ open_socket(Socket,_),!,
110 add_error(zmq_rpc_send,'Socket not open for ZMQ_RPC_DESTROY (use ZMQ_RPC_INIT or SMQ_RPC_INIT first):',Socket),
111 fail.
112 zmq_rpc_destroy(Socket) :-
113 open_socket(Socket,SocketTerm),
114 rpc_init(SocketTerm),
115 rpc_destroy_socket(SocketTerm),
116 retractall(open_socket(Socket,_)).
117
118 zmq_rpc_destroy_all :-
119 (open_socket(Socket,_) -> zmq_rpc_destroy(Socket), zmq_rpc_destroy_all ; true).
120
121 % ------------------------
122
123 % send a request on a socket; works with all sockets
124 zmq_rpc_send(Socket, _Name, _Args, _ResAst) :- \+ open_socket(Socket,_),!,
125 add_error(zmq_rpc_send,'Socket not open for ZMQ_RPC_SEND (use ZMQ_RPC_INIT first):',Socket),
126 fail.
127 zmq_rpc_send(SocketNr, Name, Args, ResAst) :-
128 open_socket(SocketNr,Socket),
129 atom(Name),
130 !,
131 next_rpc_request_id(Id),
132 write_json_rpc_request(Name, Args, Id, JsonRpcReq),
133 safe_json_write(JsonRpcReq, ReqCodes),
134 rpc_init(Socket),
135 debug_format(19,'Sending JSON-RPC on socket ~w: ~s~n',[SocketNr,ReqCodes]),
136 rpc_send_codes_request(Socket, ReqCodes),
137 rpc_receive_codes_result(Socket, ResCodes),
138 debug_format(19,'Received JSON-RPC result on socket ~w: ~s~n',[SocketNr,ResCodes]),
139 (ResCodes=end_of_file -> add_error(zmq_rpc,'JSON-RPC server not responding:',ResCodes),fail
140 ; json_parse(ResCodes, ResJson) -> true
141 ; add_error(zmq_rpc,'Cannot parse JSON-RPC result:',ResCodes),fail),
142 extract_json_rpc_result(ResJson, ReceivedId, ResAst),
143 (ReceivedId=Id -> true
144 ; ajoin(['Id ',ReceivedId,' of JSON-RPC result object does not match expected id:'],Msg),
145 add_error(zmq_rpc,Msg,Id),
146 fail
147 ).
148 zmq_rpc_send(Socket, Name, ArgsAst, ResAst) :-
149 add_internal_error('Illegal call to ZMQ_RPC_SEND:',zmq_rpc_send(Socket, Name, ArgsAst, ResAst)),
150 fail.
151
152 write_json_rpc_request(Method, Params, Id, Json) :-
153 json_freetype_to_json_parser(freeval('JsonValue', 'JsonObject', Params), JsonParams),
154 (number(Id) -> JsonId = number(Id) ; JsonId = string(Id)),
155 Json = json([jsonrpc=string('2.0'), method=string(Method), params=JsonParams, id=JsonId]).
156
157 write_json_rpc_answer(Id,freeval('RpcResult', 'RpcSuccess', ResultTerm),Json) :- !,
158 write_json_rpc_success_answer(Id, ResultTerm, Json).
159 write_json_rpc_answer(Id,freeval('RpcResult', 'RpcError', string(ErrorMsg)),Json) :- !,
160 write_json_rpc_error_answer(Id, ErrorMsg, Json).
161 write_json_rpc_answer(Id,Other,Json) :-
162 add_error(zmq_rpc,'Not a valid RpcResult freevalue:',Other),
163 write_json_rpc_error_answer(Id, 'Internal server error: illegal result object', Json).
164
165 write_json_rpc_success_answer(Id, ResultTerm, Json) :-
166 json_freetype_to_json_parser(ResultTerm, JsonResult),
167 (number(Id) -> JsonId = number(Id) ; JsonId = string(Id)),
168 Json = json([jsonrpc=string('2.0'), result=JsonResult, id=JsonId]).
169 write_json_rpc_error_answer(Id, String, Json) :-
170 (number(Id) -> JsonId = number(Id) ; JsonId = string(Id)),
171 Json = json([jsonrpc=string('2.0'), error=string(String), id=JsonId]).
172
173
174 :- dynamic last_rpc_request_id/1.
175 next_rpc_request_id(Id) :-
176 (last_rpc_request_id(Last) -> Id is Last+1 ; Id=1),
177 retractall(last_rpc_request_id(_)),
178 assertz(last_rpc_request_id(Id)).
179
180 extract_json_rpc_result(json(JsonObject), Id, ResAst) :-
181 member(jsonrpc=string(Version), JsonObject), !,
182 (Version='2.0' -> true ; add_warning(zmq_rpc,'Expected version 2.0 JSON-RPC object: ',Version)),
183 (memberchk(id=JsonId, JsonObject)
184 -> (JsonId = number(Id) -> true ; JsonId = string(Id) -> true ;
185 add_error(zmq_rpc,'Cannot extract valid id from result: ',JsonId),fail
186 )
187 ; add_error(zmq_rpc,'JSON-RPC result object has no id: ',JsonObject),
188 fail
189 ),
190 (member(error=E, JsonObject)
191 -> extract_error_result(E, WrappedRes), ResAst = freeval('RpcResult', 'RpcError', WrappedRes)
192 ; extract_success_result(JsonObject, WrappedRes), ResAst = freeval('RpcResult', 'RpcSuccess', WrappedRes)
193 ).
194 extract_json_rpc_result(Obj, _Id, _ResAst) :-
195 add_error(zmq_rpc,'Response object does not conform to JSON-RPC:',Obj),
196 fail.
197
198 extract_error_result(E, string(Res)) :-
199 (E=json(ErrorObject) -> true ; ErrorObject=[]),
200 (member(code=number(Code), ErrorObject) -> true ; Code = -1),
201 (member(message=string(Message), ErrorObject) -> true ; Message = 'N/A'),
202 ajoin(['error ', Code, ': ', Message], Res).
203
204 extract_success_result(JsonObject, ResAst) :-
205 member(result=Res, JsonObject), !,
206 json_parser_to_json_freetype(Res, ResAst).
207 extract_success_result(Obj, _ResAst) :-
208 add_error(zmq_rpc,'Response object has no result attribute:',Obj),
209 fail.
210
211
212 % ------------------------
213 % a simple server for test-purposes
214
215 :- dynamic open_server_socket/2, open_server_client_stream/3.
216
217 stream_socket_init(Port,Socket) :-
218 open_server_socket(Port,S),!,
219 Socket=S.
220 stream_socket_init(Port,Socket) :-
221 Loopback=true, % only accept connections from local machine
222 socket_server_open(Port,Socket,[loopback(Loopback)]),
223 assert(open_server_socket(Port,Socket)),
224 format('Opened RPC-JSON socket (for ndjson, UTF8 streams) on port ~w~n',[Port]).
225
226 stream_socket_rpc_server_stream_init(Port,Stream) :-
227 open_server_client_stream(Port,_C,S),!,
228 Stream=S.
229 stream_socket_rpc_server_stream_init(Port,Stream) :-
230 stream_socket_init(Port,Socket),
231 socket_server_accept(Socket, Client, Stream, [type(text),encoding(utf8)]),
232 assert(open_server_client_stream(Port,Client,Stream)),
233 format('Client connected: ~w~n',[Client]).
234
235 close_stream_server_socket(Port) :-
236 open_server_socket(Port,Socket),!,
237 (retract(open_server_client_stream(Port,_,Stream)) -> close(Stream) ; true),
238 format('Closing socket on port ~w~n',[Port]),
239 socket_server_close(Socket),
240 retract(open_server_socket(Port,Socket)).
241 close_stream_server_socket(Port) :-
242 add_message(zmq_rpc,'No socket for port open: ',Port).
243
244 :- dynamic open_server_request/2.
245 % predicate to accept a JSON-RPC request on a given port
246 % it opens socket and waits for client if necessary
247 stream_socket_rpc_accept_request(Port,_,_) :-
248 open_server_request(Port,Id),
249 add_warning(zmq_rpc,'Previous JSON-RPC request has not been answered: ',Port:Id),
250 fail.
251 stream_socket_rpc_accept_request(Port,Id,FreeVal) :-
252 stream_socket_rpc_server_stream_init(Port,Stream),
253 read_line(Stream,ResCodes),!,
254 format(user_output,'Received request line: ~s~n',[ResCodes]),
255 (ResCodes=end_of_file
256 -> close_stream_server_socket(Port), % or only close stream?
257 json_parser_to_json_freetype('@'(null),FreeVal)
258 ; json_parse(ResCodes, json(ResJson)),
259 write(' Prolog: '),write(ResJson),nl,
260 (member(id=number(Id),ResJson) -> true
261 ; add_error(zmq_rpc,'JSON-RPC request has no id:',ResJson),fail),
262 assert(open_server_request(Port,Id)),
263 json_parser_to_json_freetype(json(ResJson),FreeVal)
264 ).
265
266 stream_socket_rpc_send_answer(Port,Id,AnswerTerm) :-
267 open_server_request(Port,Id),
268 open_server_client_stream(Port,_,Stream),!,
269 write_json_rpc_answer(Id, AnswerTerm, JsonRpcReq),
270 safe_json_write(JsonRpcReq, ReqCodes),
271 format(user_output,'Response for request ~w >>> ~s~n',[Id,ReqCodes]),
272 format(Stream,'~s~n',[ReqCodes]),
273 flush_output(Stream),
274 retract(open_server_request(Port,Id)).
275 stream_socket_rpc_send_answer(Port,Id,_) :-
276 add_error(zmq_rpc,'No open server request on port:id ',Port:Id),
277 fail.
278
279 safe_json_write(Json,Codes) :-
280 (json_write(Json,Codes) -> true ; add_error(zmq_rpc,'Cannot convert JSON:',Json),fail).
281
282 % ----------------------
283 :- public test_server/1.
284 test_server(Port) :-
285 call_cleanup(test_server_loop(Port),
286 (close_stream_server_socket(Port))).
287 test_server_loop(Port) :-
288 stream_socket_rpc_accept_request(Port,Id,FreeVal),
289 (FreeVal=end_of_file -> true
290 ; write(' B: '), translate:print_bvalue(FreeVal),nl,
291 % TODO: provide predicate to send response
292 stream_socket_rpc_send_answer(Port,Id,freeval('RpcResult','RpcError',string('not implemented'))),
293 % open_server_client_stream(Port,_,Stream),
294 % format(user_output,'sending response >>> {"jsonrpc":"2.0","id":~w,"error":"not_implemented"}~n',[Id]),
295 % format(Stream,'{"jsonrpc":"2.0","id":~w,"error":"not_implemented"}~n',[Id]),
296 % %format(Stream,'{"jsonrpc":"2.0","id":~w,"result":true}~n',[Id]), % --> RpcSuccess(JsonBoolean(TRUE)) for client
297 % flush_output(Stream),
298 % retract(open_server_request(Port,Id)),
299 test_server_loop(Port)
300 ).
301