| 1 | % (c) 2024-2024 Lehrstuhl fuer Softwaretechnik und Programmiersprachen, | |
| 2 | % Heinrich Heine Universitaet Duesseldorf | |
| 3 | % This software is licenced under EPL 1.0 (http://www.eclipse.org/org/documents/epl-v10.html | |
| 4 | ||
| 5 | :- module(zmq_rpc, [zmq_rpc_init/2, | |
| 6 | stream_socket_rpc_init/2, | |
| 7 | zmq_rpc_destroy/1, | |
| 8 | zmq_rpc_destroy_all/0, | |
| 9 | zmq_rpc_send/4, | |
| 10 | ||
| 11 | stream_socket_rpc_accept_request/3, | |
| 12 | stream_socket_rpc_send_answer/3, | |
| 13 | close_stream_server_socket/1 | |
| 14 | ]). | |
| 15 | ||
| 16 | :- use_module(probsrc(module_information)). | |
| 17 | :- module_info(group, zmq). | |
| 18 | :- module_info(description, 'RPC via ZMQ implementation (using JSON-RPC 2.0)'). | |
| 19 | ||
| 20 | % Using https://www.jsonrpc.org/specification | |
| 21 | % We do not support notifications | |
| 22 | % example JSON-RPC request sent for ZMQ_RPC_SEND(x,"hello",{"a"|->JsonNull}): | |
| 23 | % {"jsonrpc":"2.0","method":"hello","params":{"a":null},"id":1} | |
| 24 | % JsonValue = JsonNull, JsonBoolean(BOOL), | |
| 25 | % JsonNumber(FLOAT), JsonString(STRING), | |
| 26 | % JsonArray(seq(JsonValue)), JsonObject(STRING +-> JsonValue); | |
| 27 | % RpcResult = RpcSuccess(JsonValue), RpcError(STRING) | |
| 28 | ||
| 29 | :- use_module(probsrc(eventhandling), [register_event_listener/3]). | |
| 30 | :- register_event_listener(reset_specification, zmq_rpc_destroy_all, 'ZMQ cleanup'). | |
| 31 | ||
| 32 | :- use_module(probsrc(pathes_lib), [safe_load_foreign_resource/2]). | |
| 33 | :- use_module(probsrc(custom_explicit_sets), [expand_custom_set_to_list/2,convert_to_avl/2]). | |
| 34 | :- use_module(probsrc(tools_strings), [ajoin/2]). | |
| 35 | :- use_module(extrasrc(json_parser), [json_parse/2, json_write/2]). | |
| 36 | :- use_module(extrasrc(json_freetype), [json_parser_to_json_freetype/2, json_freetype_to_json_parser/2]). | |
| 37 | :- use_module(probsrc(error_manager),[add_error/3, add_warning/3, add_internal_error/2, add_message/3]). | |
| 38 | :- use_module(probsrc(debug),[debug_println/2, debug_format/3]). | |
| 39 | :- use_module(library(lists)). | |
| 40 | :- use_module(library(codesio), [with_output_to_codes/2]). | |
| 41 | :- use_module(library(sockets)). | |
| 42 | :- use_module(probsrc(gensym),[gennum/1]). | |
| 43 | ||
| 44 | foreign(zmq_rpc_init_impl, zmq_rpc_init_impl(+string, [-address])). | |
| 45 | foreign(zmq_rpc_destroy_impl, zmq_rpc_destroy_impl(+address)). | |
| 46 | foreign(zmq_rpc_send_str_impl, zmq_rpc_send_str_impl(+address, +codes)). | |
| 47 | foreign(zmq_rpc_receive_str_impl, zmq_rpc_receive_str_impl(+address, [-codes])). | |
| 48 | foreign_resource(zmq_rpc, [zmq_rpc_init_impl, zmq_rpc_destroy_impl, zmq_rpc_send_str_impl, zmq_rpc_receive_str_impl]). | |
| 49 | ||
| 50 | :- dynamic loaded/0. | |
| 51 | ||
| 52 | init_zmq_rpc :- | |
| 53 | (loaded -> true | |
| 54 | ; assertz(loaded), | |
| 55 | safe_load_foreign_resource(zmq_rpc, zmq_rpc) | |
| 56 | ). | |
| 57 | ||
| 58 | :- dynamic open_socket/2. | |
| 59 | % open_socket(InternalSockentNrName,SocketTerm) | |
| 60 | % SocketTerm is either integer for ZMQ or sicstus_stream_socket(Stream) | |
| 61 | ||
| 62 | % API to decide how to send/receive messages depending on socket type | |
| 63 | rpc_send_codes_request(sicstus_stream_socket(Stream),Codes) :- !, | |
| 64 | % send request as a single line; supposes Codes does not contain newline characters | |
| 65 | %format(user_output,'>>> ~s~n',[Codes]), | |
| 66 | format(Stream,'~s~n',[Codes]), flush_output(Stream). | |
| 67 | rpc_send_codes_request(Socket, ReqCodes) :- % send request via ZMQ API | |
| 68 | zmq_rpc_send_str_impl(Socket,ReqCodes). | |
| 69 | ||
| 70 | rpc_receive_codes_result(sicstus_stream_socket(Stream),Codes) :- !, read_line(Stream,Codes). | |
| 71 | rpc_receive_codes_result(Socket, ReqCodes) :- zmq_rpc_receive_str_impl(Socket,ReqCodes). | |
| 72 | ||
| 73 | rpc_destroy_socket(sicstus_stream_socket(Stream)) :- !, close(Stream). | |
| 74 | rpc_destroy_socket(Socket) :- zmq_rpc_destroy_impl(Socket). | |
| 75 | ||
| 76 | rpc_init(sicstus_stream_socket(_)) :- !. % no init required for these kinds of sockets | |
| 77 | rpc_init(_) :- init_zmq_rpc. | |
| 78 | ||
| 79 | % ------------------------ | |
| 80 | ||
| 81 | % set-up a regular Stream socket using single-line ndjson messages | |
| 82 | stream_socket_rpc_init(PortTerm,Out) :- | |
| 83 | % TODO: also allow to pass nodename, with empty ('') a connection is made to local machine | |
| 84 | catch(socket_client_open('':PortTerm, Stream, [type(text),encoding(utf8)]),Exc, | |
| 85 | (add_error(zmq_rpc,'Cannot open socket as client:',Exc),fail)), | |
| 86 | gennum(GN), | |
| 87 | SocketNr is -1-GN, % we assume negative numbers not used by ZMQ, | |
| 88 | % makes API easier to use/type for end-users than returning e.g. string(Name) | |
| 89 | (open_socket(SocketNr,_) -> add_internal_error('Socket number clash:',SocketNr) ; true), | |
| 90 | assertz(open_socket(SocketNr,sicstus_stream_socket(Stream))), | |
| 91 | Out = int(SocketNr). | |
| 92 | ||
| 93 | ||
| 94 | % ------------------------ | |
| 95 | ||
| 96 | % set-up a ZMQ socket | |
| 97 | zmq_rpc_init(Endpoint, Out) :- | |
| 98 | atom(Endpoint), | |
| 99 | init_zmq_rpc, | |
| 100 | zmq_rpc_init_impl(Endpoint, Res), | |
| 101 | debug_println(9,zmq_rpc_init_impl(Endpoint, Res)), | |
| 102 | integer(Res), | |
| 103 | assertz(open_socket(Res,Res)), | |
| 104 | Out = int(Res). | |
| 105 | ||
| 106 | % ------------------------ | |
| 107 | ||
| 108 | % destroy a socket with internal number SocketNr, works with all kinds of sockets | |
| 109 | zmq_rpc_destroy(Socket) :- \+ open_socket(Socket,_),!, | |
| 110 | add_error(zmq_rpc_send,'Socket not open for ZMQ_RPC_DESTROY (use ZMQ_RPC_INIT or SMQ_RPC_INIT first):',Socket), | |
| 111 | fail. | |
| 112 | zmq_rpc_destroy(Socket) :- | |
| 113 | open_socket(Socket,SocketTerm), | |
| 114 | rpc_init(SocketTerm), | |
| 115 | rpc_destroy_socket(SocketTerm), | |
| 116 | retractall(open_socket(Socket,_)). | |
| 117 | ||
| 118 | zmq_rpc_destroy_all :- | |
| 119 | (open_socket(Socket,_) -> zmq_rpc_destroy(Socket), zmq_rpc_destroy_all ; true). | |
| 120 | ||
| 121 | % ------------------------ | |
| 122 | ||
| 123 | % send a request on a socket; works with all sockets | |
| 124 | zmq_rpc_send(Socket, _Name, _Args, _ResAst) :- \+ open_socket(Socket,_),!, | |
| 125 | add_error(zmq_rpc_send,'Socket not open for ZMQ_RPC_SEND (use ZMQ_RPC_INIT first):',Socket), | |
| 126 | fail. | |
| 127 | zmq_rpc_send(SocketNr, Name, Args, ResAst) :- | |
| 128 | open_socket(SocketNr,Socket), | |
| 129 | atom(Name), | |
| 130 | !, | |
| 131 | next_rpc_request_id(Id), | |
| 132 | write_json_rpc_request(Name, Args, Id, JsonRpcReq), | |
| 133 | safe_json_write(JsonRpcReq, ReqCodes), | |
| 134 | rpc_init(Socket), | |
| 135 | debug_format(19,'Sending JSON-RPC on socket ~w: ~s~n',[SocketNr,ReqCodes]), | |
| 136 | rpc_send_codes_request(Socket, ReqCodes), | |
| 137 | rpc_receive_codes_result(Socket, ResCodes), | |
| 138 | debug_format(19,'Received JSON-RPC result on socket ~w: ~s~n',[SocketNr,ResCodes]), | |
| 139 | (ResCodes=end_of_file -> add_error(zmq_rpc,'JSON-RPC server not responding:',ResCodes),fail | |
| 140 | ; json_parse(ResCodes, ResJson) -> true | |
| 141 | ; add_error(zmq_rpc,'Cannot parse JSON-RPC result:',ResCodes),fail), | |
| 142 | extract_json_rpc_result(ResJson, ReceivedId, ResAst), | |
| 143 | (ReceivedId=Id -> true | |
| 144 | ; ajoin(['Id ',ReceivedId,' of JSON-RPC result object does not match expected id:'],Msg), | |
| 145 | add_error(zmq_rpc,Msg,Id), | |
| 146 | fail | |
| 147 | ). | |
| 148 | zmq_rpc_send(Socket, Name, ArgsAst, ResAst) :- | |
| 149 | add_internal_error('Illegal call to ZMQ_RPC_SEND:',zmq_rpc_send(Socket, Name, ArgsAst, ResAst)), | |
| 150 | fail. | |
| 151 | ||
| 152 | write_json_rpc_request(Method, Params, Id, Json) :- | |
| 153 | json_freetype_to_json_parser(freeval('JsonValue', 'JsonObject', Params), JsonParams), | |
| 154 | (number(Id) -> JsonId = number(Id) ; JsonId = string(Id)), | |
| 155 | Json = json([jsonrpc=string('2.0'), method=string(Method), params=JsonParams, id=JsonId]). | |
| 156 | ||
| 157 | write_json_rpc_answer(Id,freeval('RpcResult', 'RpcSuccess', ResultTerm),Json) :- !, | |
| 158 | write_json_rpc_success_answer(Id, ResultTerm, Json). | |
| 159 | write_json_rpc_answer(Id,freeval('RpcResult', 'RpcError', string(ErrorMsg)),Json) :- !, | |
| 160 | write_json_rpc_error_answer(Id, ErrorMsg, Json). | |
| 161 | write_json_rpc_answer(Id,Other,Json) :- | |
| 162 | add_error(zmq_rpc,'Not a valid RpcResult freevalue:',Other), | |
| 163 | write_json_rpc_error_answer(Id, 'Internal server error: illegal result object', Json). | |
| 164 | ||
| 165 | write_json_rpc_success_answer(Id, ResultTerm, Json) :- | |
| 166 | json_freetype_to_json_parser(ResultTerm, JsonResult), | |
| 167 | (number(Id) -> JsonId = number(Id) ; JsonId = string(Id)), | |
| 168 | Json = json([jsonrpc=string('2.0'), result=JsonResult, id=JsonId]). | |
| 169 | write_json_rpc_error_answer(Id, String, Json) :- | |
| 170 | (number(Id) -> JsonId = number(Id) ; JsonId = string(Id)), | |
| 171 | Json = json([jsonrpc=string('2.0'), error=string(String), id=JsonId]). | |
| 172 | ||
| 173 | ||
| 174 | :- dynamic last_rpc_request_id/1. | |
| 175 | next_rpc_request_id(Id) :- | |
| 176 | (last_rpc_request_id(Last) -> Id is Last+1 ; Id=1), | |
| 177 | retractall(last_rpc_request_id(_)), | |
| 178 | assertz(last_rpc_request_id(Id)). | |
| 179 | ||
| 180 | extract_json_rpc_result(json(JsonObject), Id, ResAst) :- | |
| 181 | member(jsonrpc=string(Version), JsonObject), !, | |
| 182 | (Version='2.0' -> true ; add_warning(zmq_rpc,'Expected version 2.0 JSON-RPC object: ',Version)), | |
| 183 | (memberchk(id=JsonId, JsonObject) | |
| 184 | -> (JsonId = number(Id) -> true ; JsonId = string(Id) -> true ; | |
| 185 | add_error(zmq_rpc,'Cannot extract valid id from result: ',JsonId),fail | |
| 186 | ) | |
| 187 | ; add_error(zmq_rpc,'JSON-RPC result object has no id: ',JsonObject), | |
| 188 | fail | |
| 189 | ), | |
| 190 | (member(error=E, JsonObject) | |
| 191 | -> extract_error_result(E, WrappedRes), ResAst = freeval('RpcResult', 'RpcError', WrappedRes) | |
| 192 | ; extract_success_result(JsonObject, WrappedRes), ResAst = freeval('RpcResult', 'RpcSuccess', WrappedRes) | |
| 193 | ). | |
| 194 | extract_json_rpc_result(Obj, _Id, _ResAst) :- | |
| 195 | add_error(zmq_rpc,'Response object does not conform to JSON-RPC:',Obj), | |
| 196 | fail. | |
| 197 | ||
| 198 | extract_error_result(E, string(Res)) :- | |
| 199 | (E=json(ErrorObject) -> true ; ErrorObject=[]), | |
| 200 | (member(code=number(Code), ErrorObject) -> true ; Code = -1), | |
| 201 | (member(message=string(Message), ErrorObject) -> true ; Message = 'N/A'), | |
| 202 | ajoin(['error ', Code, ': ', Message], Res). | |
| 203 | ||
| 204 | extract_success_result(JsonObject, ResAst) :- | |
| 205 | member(result=Res, JsonObject), !, | |
| 206 | json_parser_to_json_freetype(Res, ResAst). | |
| 207 | extract_success_result(Obj, _ResAst) :- | |
| 208 | add_error(zmq_rpc,'Response object has no result attribute:',Obj), | |
| 209 | fail. | |
| 210 | ||
| 211 | ||
| 212 | % ------------------------ | |
| 213 | % a simple server for test-purposes | |
| 214 | ||
| 215 | :- dynamic open_server_socket/2, open_server_client_stream/3. | |
| 216 | ||
| 217 | stream_socket_init(Port,Socket) :- | |
| 218 | open_server_socket(Port,S),!, | |
| 219 | Socket=S. | |
| 220 | stream_socket_init(Port,Socket) :- | |
| 221 | Loopback=true, % only accept connections from local machine | |
| 222 | socket_server_open(Port,Socket,[loopback(Loopback)]), | |
| 223 | assert(open_server_socket(Port,Socket)), | |
| 224 | format('Opened RPC-JSON socket (for ndjson, UTF8 streams) on port ~w~n',[Port]). | |
| 225 | ||
| 226 | stream_socket_rpc_server_stream_init(Port,Stream) :- | |
| 227 | open_server_client_stream(Port,_C,S),!, | |
| 228 | Stream=S. | |
| 229 | stream_socket_rpc_server_stream_init(Port,Stream) :- | |
| 230 | stream_socket_init(Port,Socket), | |
| 231 | socket_server_accept(Socket, Client, Stream, [type(text),encoding(utf8)]), | |
| 232 | assert(open_server_client_stream(Port,Client,Stream)), | |
| 233 | format('Client connected: ~w~n',[Client]). | |
| 234 | ||
| 235 | close_stream_server_socket(Port) :- | |
| 236 | open_server_socket(Port,Socket),!, | |
| 237 | (retract(open_server_client_stream(Port,_,Stream)) -> close(Stream) ; true), | |
| 238 | format('Closing socket on port ~w~n',[Port]), | |
| 239 | socket_server_close(Socket), | |
| 240 | retract(open_server_socket(Port,Socket)). | |
| 241 | close_stream_server_socket(Port) :- | |
| 242 | add_message(zmq_rpc,'No socket for port open: ',Port). | |
| 243 | ||
| 244 | :- dynamic open_server_request/2. | |
| 245 | % predicate to accept a JSON-RPC request on a given port | |
| 246 | % it opens socket and waits for client if necessary | |
| 247 | stream_socket_rpc_accept_request(Port,_,_) :- | |
| 248 | open_server_request(Port,Id), | |
| 249 | add_warning(zmq_rpc,'Previous JSON-RPC request has not been answered: ',Port:Id), | |
| 250 | fail. | |
| 251 | stream_socket_rpc_accept_request(Port,Id,FreeVal) :- | |
| 252 | stream_socket_rpc_server_stream_init(Port,Stream), | |
| 253 | read_line(Stream,ResCodes),!, | |
| 254 | format(user_output,'Received request line: ~s~n',[ResCodes]), | |
| 255 | (ResCodes=end_of_file | |
| 256 | -> close_stream_server_socket(Port), % or only close stream? | |
| 257 | json_parser_to_json_freetype('@'(null),FreeVal) | |
| 258 | ; json_parse(ResCodes, json(ResJson)), | |
| 259 | write(' Prolog: '),write(ResJson),nl, | |
| 260 | (member(id=number(Id),ResJson) -> true | |
| 261 | ; add_error(zmq_rpc,'JSON-RPC request has no id:',ResJson),fail), | |
| 262 | assert(open_server_request(Port,Id)), | |
| 263 | json_parser_to_json_freetype(json(ResJson),FreeVal) | |
| 264 | ). | |
| 265 | ||
| 266 | stream_socket_rpc_send_answer(Port,Id,AnswerTerm) :- | |
| 267 | open_server_request(Port,Id), | |
| 268 | open_server_client_stream(Port,_,Stream),!, | |
| 269 | write_json_rpc_answer(Id, AnswerTerm, JsonRpcReq), | |
| 270 | safe_json_write(JsonRpcReq, ReqCodes), | |
| 271 | format(user_output,'Response for request ~w >>> ~s~n',[Id,ReqCodes]), | |
| 272 | format(Stream,'~s~n',[ReqCodes]), | |
| 273 | flush_output(Stream), | |
| 274 | retract(open_server_request(Port,Id)). | |
| 275 | stream_socket_rpc_send_answer(Port,Id,_) :- | |
| 276 | add_error(zmq_rpc,'No open server request on port:id ',Port:Id), | |
| 277 | fail. | |
| 278 | ||
| 279 | safe_json_write(Json,Codes) :- | |
| 280 | (json_write(Json,Codes) -> true ; add_error(zmq_rpc,'Cannot convert JSON:',Json),fail). | |
| 281 | ||
| 282 | % ---------------------- | |
| 283 | :- public test_server/1. | |
| 284 | test_server(Port) :- | |
| 285 | call_cleanup(test_server_loop(Port), | |
| 286 | (close_stream_server_socket(Port))). | |
| 287 | test_server_loop(Port) :- | |
| 288 | stream_socket_rpc_accept_request(Port,Id,FreeVal), | |
| 289 | (FreeVal=end_of_file -> true | |
| 290 | ; write(' B: '), translate:print_bvalue(FreeVal),nl, | |
| 291 | % TODO: provide predicate to send response | |
| 292 | stream_socket_rpc_send_answer(Port,Id,freeval('RpcResult','RpcError',string('not implemented'))), | |
| 293 | % open_server_client_stream(Port,_,Stream), | |
| 294 | % format(user_output,'sending response >>> {"jsonrpc":"2.0","id":~w,"error":"not_implemented"}~n',[Id]), | |
| 295 | % format(Stream,'{"jsonrpc":"2.0","id":~w,"error":"not_implemented"}~n',[Id]), | |
| 296 | % %format(Stream,'{"jsonrpc":"2.0","id":~w,"result":true}~n',[Id]), % --> RpcSuccess(JsonBoolean(TRUE)) for client | |
| 297 | % flush_output(Stream), | |
| 298 | % retract(open_server_request(Port,Id)), | |
| 299 | test_server_loop(Port) | |
| 300 | ). | |
| 301 |