1 | % (c) 2024-2024 Lehrstuhl fuer Softwaretechnik und Programmiersprachen, | |
2 | % Heinrich Heine Universitaet Duesseldorf | |
3 | % This software is licenced under EPL 1.0 (http://www.eclipse.org/org/documents/epl-v10.html | |
4 | ||
5 | :- module(zmq_rpc, [zmq_rpc_init/2, | |
6 | stream_socket_rpc_init/2, | |
7 | zmq_rpc_destroy/1, | |
8 | zmq_rpc_destroy_all/0, | |
9 | zmq_rpc_send/4, | |
10 | ||
11 | stream_socket_rpc_accept_request/3, | |
12 | stream_socket_rpc_send_answer/3, | |
13 | close_stream_server_socket/1 | |
14 | ]). | |
15 | ||
16 | :- use_module(probsrc(module_information)). | |
17 | :- module_info(group, zmq). | |
18 | :- module_info(description, 'RPC via ZMQ implementation (using JSON-RPC 2.0)'). | |
19 | ||
20 | % Using https://www.jsonrpc.org/specification | |
21 | % We do not support notifications | |
22 | % example JSON-RPC request sent for ZMQ_RPC_SEND(x,"hello",{"a"|->JsonNull}): | |
23 | % {"jsonrpc":"2.0","method":"hello","params":{"a":null},"id":1} | |
24 | % JsonValue = JsonNull, JsonBoolean(BOOL), | |
25 | % JsonNumber(FLOAT), JsonString(STRING), | |
26 | % JsonArray(seq(JsonValue)), JsonObject(STRING +-> JsonValue); | |
27 | % RpcResult = RpcSuccess(JsonValue), RpcError(STRING) | |
28 | ||
29 | :- use_module(probsrc(eventhandling), [register_event_listener/3]). | |
30 | :- register_event_listener(reset_specification, zmq_rpc_destroy_all, 'ZMQ cleanup'). | |
31 | ||
32 | :- use_module(probsrc(pathes_lib), [safe_load_foreign_resource/2]). | |
33 | :- use_module(probsrc(custom_explicit_sets), [expand_custom_set_to_list/2,convert_to_avl/2]). | |
34 | :- use_module(probsrc(tools_strings), [ajoin/2]). | |
35 | :- use_module(extrasrc(json_parser), [json_parse/2, json_write/2]). | |
36 | :- use_module(extrasrc(json_freetype), [json_parser_to_json_freetype/2, json_freetype_to_json_parser/2]). | |
37 | :- use_module(probsrc(error_manager),[add_error/3, add_warning/3, add_internal_error/2, add_message/3]). | |
38 | :- use_module(probsrc(debug),[debug_println/2, debug_format/3]). | |
39 | :- use_module(library(lists)). | |
40 | :- use_module(library(codesio), [with_output_to_codes/2]). | |
41 | :- use_module(library(sockets)). | |
42 | :- use_module(probsrc(gensym),[gennum/1]). | |
43 | ||
44 | foreign(zmq_rpc_init_impl, zmq_rpc_init_impl(+string, [-address])). | |
45 | foreign(zmq_rpc_destroy_impl, zmq_rpc_destroy_impl(+address)). | |
46 | foreign(zmq_rpc_send_str_impl, zmq_rpc_send_str_impl(+address, +codes)). | |
47 | foreign(zmq_rpc_receive_str_impl, zmq_rpc_receive_str_impl(+address, [-codes])). | |
48 | foreign_resource(zmq_rpc, [zmq_rpc_init_impl, zmq_rpc_destroy_impl, zmq_rpc_send_str_impl, zmq_rpc_receive_str_impl]). | |
49 | ||
50 | :- dynamic loaded/0. | |
51 | ||
52 | init_zmq_rpc :- | |
53 | (loaded -> true | |
54 | ; assertz(loaded), | |
55 | safe_load_foreign_resource(zmq_rpc, zmq_rpc) | |
56 | ). | |
57 | ||
58 | :- dynamic open_socket/2. | |
59 | % open_socket(InternalSockentNrName,SocketTerm) | |
60 | % SocketTerm is either integer for ZMQ or sicstus_stream_socket(Stream) | |
61 | ||
62 | % API to decide how to send/receive messages depending on socket type | |
63 | rpc_send_codes_request(sicstus_stream_socket(Stream),Codes) :- !, | |
64 | % send request as a single line; supposes Codes does not contain newline characters | |
65 | %format(user_output,'>>> ~s~n',[Codes]), | |
66 | format(Stream,'~s~n',[Codes]), flush_output(Stream). | |
67 | rpc_send_codes_request(Socket, ReqCodes) :- % send request via ZMQ API | |
68 | zmq_rpc_send_str_impl(Socket,ReqCodes). | |
69 | ||
70 | rpc_receive_codes_result(sicstus_stream_socket(Stream),Codes) :- !, read_line(Stream,Codes). | |
71 | rpc_receive_codes_result(Socket, ReqCodes) :- zmq_rpc_receive_str_impl(Socket,ReqCodes). | |
72 | ||
73 | rpc_destroy_socket(sicstus_stream_socket(Stream)) :- !, close(Stream). | |
74 | rpc_destroy_socket(Socket) :- zmq_rpc_destroy_impl(Socket). | |
75 | ||
76 | rpc_init(sicstus_stream_socket(_)) :- !. % no init required for these kinds of sockets | |
77 | rpc_init(_) :- init_zmq_rpc. | |
78 | ||
79 | % ------------------------ | |
80 | ||
81 | % set-up a regular Stream socket using single-line ndjson messages | |
82 | stream_socket_rpc_init(PortTerm,Out) :- | |
83 | % TODO: also allow to pass nodename, with empty ('') a connection is made to local machine | |
84 | catch(socket_client_open('':PortTerm, Stream, [type(text),encoding(utf8)]),Exc, | |
85 | (add_error(zmq_rpc,'Cannot open socket as client:',Exc),fail)), | |
86 | gennum(GN), | |
87 | SocketNr is -1-GN, % we assume negative numbers not used by ZMQ, | |
88 | % makes API easier to use/type for end-users than returning e.g. string(Name) | |
89 | (open_socket(SocketNr,_) -> add_internal_error('Socket number clash:',SocketNr) ; true), | |
90 | assertz(open_socket(SocketNr,sicstus_stream_socket(Stream))), | |
91 | Out = int(SocketNr). | |
92 | ||
93 | ||
94 | % ------------------------ | |
95 | ||
96 | % set-up a ZMQ socket | |
97 | zmq_rpc_init(Endpoint, Out) :- | |
98 | atom(Endpoint), | |
99 | init_zmq_rpc, | |
100 | zmq_rpc_init_impl(Endpoint, Res), | |
101 | debug_println(9,zmq_rpc_init_impl(Endpoint, Res)), | |
102 | integer(Res), | |
103 | assertz(open_socket(Res,Res)), | |
104 | Out = int(Res). | |
105 | ||
106 | % ------------------------ | |
107 | ||
108 | % destroy a socket with internal number SocketNr, works with all kinds of sockets | |
109 | zmq_rpc_destroy(Socket) :- \+ open_socket(Socket,_),!, | |
110 | add_error(zmq_rpc_send,'Socket not open for ZMQ_RPC_DESTROY (use ZMQ_RPC_INIT or SMQ_RPC_INIT first):',Socket), | |
111 | fail. | |
112 | zmq_rpc_destroy(Socket) :- | |
113 | open_socket(Socket,SocketTerm), | |
114 | rpc_init(SocketTerm), | |
115 | rpc_destroy_socket(SocketTerm), | |
116 | retractall(open_socket(Socket,_)). | |
117 | ||
118 | zmq_rpc_destroy_all :- | |
119 | (open_socket(Socket,_) -> zmq_rpc_destroy(Socket), zmq_rpc_destroy_all ; true). | |
120 | ||
121 | % ------------------------ | |
122 | ||
123 | % send a request on a socket; works with all sockets | |
124 | zmq_rpc_send(Socket, _Name, _Args, _ResAst) :- \+ open_socket(Socket,_),!, | |
125 | add_error(zmq_rpc_send,'Socket not open for ZMQ_RPC_SEND (use ZMQ_RPC_INIT first):',Socket), | |
126 | fail. | |
127 | zmq_rpc_send(SocketNr, Name, Args, ResAst) :- | |
128 | open_socket(SocketNr,Socket), | |
129 | atom(Name), | |
130 | !, | |
131 | next_rpc_request_id(Id), | |
132 | write_json_rpc_request(Name, Args, Id, JsonRpcReq), | |
133 | safe_json_write(JsonRpcReq, ReqCodes), | |
134 | rpc_init(Socket), | |
135 | debug_format(19,'Sending JSON-RPC on socket ~w: ~s~n',[SocketNr,ReqCodes]), | |
136 | rpc_send_codes_request(Socket, ReqCodes), | |
137 | rpc_receive_codes_result(Socket, ResCodes), | |
138 | debug_format(19,'Received JSON-RPC result on socket ~w: ~s~n',[SocketNr,ResCodes]), | |
139 | (ResCodes=end_of_file -> add_error(zmq_rpc,'JSON-RPC server not responding:',ResCodes),fail | |
140 | ; json_parse(ResCodes, ResJson) -> true | |
141 | ; add_error(zmq_rpc,'Cannot parse JSON-RPC result:',ResCodes),fail), | |
142 | extract_json_rpc_result(ResJson, ReceivedId, ResAst), | |
143 | (ReceivedId=Id -> true | |
144 | ; ajoin(['Id ',ReceivedId,' of JSON-RPC result object does not match expected id:'],Msg), | |
145 | add_error(zmq_rpc,Msg,Id), | |
146 | fail | |
147 | ). | |
148 | zmq_rpc_send(Socket, Name, ArgsAst, ResAst) :- | |
149 | add_internal_error('Illegal call to ZMQ_RPC_SEND:',zmq_rpc_send(Socket, Name, ArgsAst, ResAst)), | |
150 | fail. | |
151 | ||
152 | write_json_rpc_request(Method, Params, Id, Json) :- | |
153 | json_freetype_to_json_parser(freeval('JsonValue', 'JsonObject', Params), JsonParams), | |
154 | (number(Id) -> JsonId = number(Id) ; JsonId = string(Id)), | |
155 | Json = json([jsonrpc=string('2.0'), method=string(Method), params=JsonParams, id=JsonId]). | |
156 | ||
157 | write_json_rpc_answer(Id,freeval('RpcResult', 'RpcSuccess', ResultTerm),Json) :- !, | |
158 | write_json_rpc_success_answer(Id, ResultTerm, Json). | |
159 | write_json_rpc_answer(Id,freeval('RpcResult', 'RpcError', string(ErrorMsg)),Json) :- !, | |
160 | write_json_rpc_error_answer(Id, ErrorMsg, Json). | |
161 | write_json_rpc_answer(Id,Other,Json) :- | |
162 | add_error(zmq_rpc,'Not a valid RpcResult freevalue:',Other), | |
163 | write_json_rpc_error_answer(Id, 'Internal server error: illegal result object', Json). | |
164 | ||
165 | write_json_rpc_success_answer(Id, ResultTerm, Json) :- | |
166 | json_freetype_to_json_parser(ResultTerm, JsonResult), | |
167 | (number(Id) -> JsonId = number(Id) ; JsonId = string(Id)), | |
168 | Json = json([jsonrpc=string('2.0'), result=JsonResult, id=JsonId]). | |
169 | write_json_rpc_error_answer(Id, String, Json) :- | |
170 | (number(Id) -> JsonId = number(Id) ; JsonId = string(Id)), | |
171 | Json = json([jsonrpc=string('2.0'), error=string(String), id=JsonId]). | |
172 | ||
173 | ||
174 | :- dynamic last_rpc_request_id/1. | |
175 | next_rpc_request_id(Id) :- | |
176 | (last_rpc_request_id(Last) -> Id is Last+1 ; Id=1), | |
177 | retractall(last_rpc_request_id(_)), | |
178 | assertz(last_rpc_request_id(Id)). | |
179 | ||
180 | extract_json_rpc_result(json(JsonObject), Id, ResAst) :- | |
181 | member(jsonrpc=string(Version), JsonObject), !, | |
182 | (Version='2.0' -> true ; add_warning(zmq_rpc,'Expected version 2.0 JSON-RPC object: ',Version)), | |
183 | (memberchk(id=JsonId, JsonObject) | |
184 | -> (JsonId = number(Id) -> true ; JsonId = string(Id) -> true ; | |
185 | add_error(zmq_rpc,'Cannot extract valid id from result: ',JsonId),fail | |
186 | ) | |
187 | ; add_error(zmq_rpc,'JSON-RPC result object has no id: ',JsonObject), | |
188 | fail | |
189 | ), | |
190 | (member(error=E, JsonObject) | |
191 | -> extract_error_result(E, WrappedRes), ResAst = freeval('RpcResult', 'RpcError', WrappedRes) | |
192 | ; extract_success_result(JsonObject, WrappedRes), ResAst = freeval('RpcResult', 'RpcSuccess', WrappedRes) | |
193 | ). | |
194 | extract_json_rpc_result(Obj, _Id, _ResAst) :- | |
195 | add_error(zmq_rpc,'Response object does not conform to JSON-RPC:',Obj), | |
196 | fail. | |
197 | ||
198 | extract_error_result(E, string(Res)) :- | |
199 | (E=json(ErrorObject) -> true ; ErrorObject=[]), | |
200 | (member(code=number(Code), ErrorObject) -> true ; Code = -1), | |
201 | (member(message=string(Message), ErrorObject) -> true ; Message = 'N/A'), | |
202 | ajoin(['error ', Code, ': ', Message], Res). | |
203 | ||
204 | extract_success_result(JsonObject, ResAst) :- | |
205 | member(result=Res, JsonObject), !, | |
206 | json_parser_to_json_freetype(Res, ResAst). | |
207 | extract_success_result(Obj, _ResAst) :- | |
208 | add_error(zmq_rpc,'Response object has no result attribute:',Obj), | |
209 | fail. | |
210 | ||
211 | ||
212 | % ------------------------ | |
213 | % a simple server for test-purposes | |
214 | ||
215 | :- dynamic open_server_socket/2, open_server_client_stream/3. | |
216 | ||
217 | stream_socket_init(Port,Socket) :- | |
218 | open_server_socket(Port,S),!, | |
219 | Socket=S. | |
220 | stream_socket_init(Port,Socket) :- | |
221 | Loopback=true, % only accept connections from local machine | |
222 | socket_server_open(Port,Socket,[loopback(Loopback)]), | |
223 | assert(open_server_socket(Port,Socket)), | |
224 | format('Opened RPC-JSON socket (for ndjson, UTF8 streams) on port ~w~n',[Port]). | |
225 | ||
226 | stream_socket_rpc_server_stream_init(Port,Stream) :- | |
227 | open_server_client_stream(Port,_C,S),!, | |
228 | Stream=S. | |
229 | stream_socket_rpc_server_stream_init(Port,Stream) :- | |
230 | stream_socket_init(Port,Socket), | |
231 | socket_server_accept(Socket, Client, Stream, [type(text),encoding(utf8)]), | |
232 | assert(open_server_client_stream(Port,Client,Stream)), | |
233 | format('Client connected: ~w~n',[Client]). | |
234 | ||
235 | close_stream_server_socket(Port) :- | |
236 | open_server_socket(Port,Socket),!, | |
237 | (retract(open_server_client_stream(Port,_,Stream)) -> close(Stream) ; true), | |
238 | format('Closing socket on port ~w~n',[Port]), | |
239 | socket_server_close(Socket), | |
240 | retract(open_server_socket(Port,Socket)). | |
241 | close_stream_server_socket(Port) :- | |
242 | add_message(zmq_rpc,'No socket for port open: ',Port). | |
243 | ||
244 | :- dynamic open_server_request/2. | |
245 | % predicate to accept a JSON-RPC request on a given port | |
246 | % it opens socket and waits for client if necessary | |
247 | stream_socket_rpc_accept_request(Port,_,_) :- | |
248 | open_server_request(Port,Id), | |
249 | add_warning(zmq_rpc,'Previous JSON-RPC request has not been answered: ',Port:Id), | |
250 | fail. | |
251 | stream_socket_rpc_accept_request(Port,Id,FreeVal) :- | |
252 | stream_socket_rpc_server_stream_init(Port,Stream), | |
253 | read_line(Stream,ResCodes),!, | |
254 | format(user_output,'Received request line: ~s~n',[ResCodes]), | |
255 | (ResCodes=end_of_file | |
256 | -> close_stream_server_socket(Port), % or only close stream? | |
257 | json_parser_to_json_freetype('@'(null),FreeVal) | |
258 | ; json_parse(ResCodes, json(ResJson)), | |
259 | write(' Prolog: '),write(ResJson),nl, | |
260 | (member(id=number(Id),ResJson) -> true | |
261 | ; add_error(zmq_rpc,'JSON-RPC request has no id:',ResJson),fail), | |
262 | assert(open_server_request(Port,Id)), | |
263 | json_parser_to_json_freetype(json(ResJson),FreeVal) | |
264 | ). | |
265 | ||
266 | stream_socket_rpc_send_answer(Port,Id,AnswerTerm) :- | |
267 | open_server_request(Port,Id), | |
268 | open_server_client_stream(Port,_,Stream),!, | |
269 | write_json_rpc_answer(Id, AnswerTerm, JsonRpcReq), | |
270 | safe_json_write(JsonRpcReq, ReqCodes), | |
271 | format(user_output,'Response for request ~w >>> ~s~n',[Id,ReqCodes]), | |
272 | format(Stream,'~s~n',[ReqCodes]), | |
273 | flush_output(Stream), | |
274 | retract(open_server_request(Port,Id)). | |
275 | stream_socket_rpc_send_answer(Port,Id,_) :- | |
276 | add_error(zmq_rpc,'No open server request on port:id ',Port:Id), | |
277 | fail. | |
278 | ||
279 | safe_json_write(Json,Codes) :- | |
280 | (json_write(Json,Codes) -> true ; add_error(zmq_rpc,'Cannot convert JSON:',Json),fail). | |
281 | ||
282 | % ---------------------- | |
283 | :- public test_server/1. | |
284 | test_server(Port) :- | |
285 | call_cleanup(test_server_loop(Port), | |
286 | (close_stream_server_socket(Port))). | |
287 | test_server_loop(Port) :- | |
288 | stream_socket_rpc_accept_request(Port,Id,FreeVal), | |
289 | (FreeVal=end_of_file -> true | |
290 | ; write(' B: '), translate:print_bvalue(FreeVal),nl, | |
291 | % TODO: provide predicate to send response | |
292 | stream_socket_rpc_send_answer(Port,Id,freeval('RpcResult','RpcError',string('not implemented'))), | |
293 | % open_server_client_stream(Port,_,Stream), | |
294 | % format(user_output,'sending response >>> {"jsonrpc":"2.0","id":~w,"error":"not_implemented"}~n',[Id]), | |
295 | % format(Stream,'{"jsonrpc":"2.0","id":~w,"error":"not_implemented"}~n',[Id]), | |
296 | % %format(Stream,'{"jsonrpc":"2.0","id":~w,"result":true}~n',[Id]), % --> RpcSuccess(JsonBoolean(TRUE)) for client | |
297 | % flush_output(Stream), | |
298 | % retract(open_server_request(Port,Id)), | |
299 | test_server_loop(Port) | |
300 | ). | |
301 |