/* * $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $ * * Copyright (C) 2010 TelTech Systems Inc. * * This file is part of SEMS, a free SIP media server. * * SEMS is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. This program is released under * the GPL with the additional exemption that compiling, linking, * and/or using OpenSSL is allowed. * * For a license to use the SEMS software under conditions * other than those described here, or to purchase support for this * software, please contact iptel.org by e-mail at the following addresses: * info@iptel.org * * SEMS is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "JsonRPCServer.h" #include "RpcPeer.h" #include "JsonRPCEvents.h" #include "jsonArg.h" #include "AmEventDispatcher.h" #include "AmPlugIn.h" #include "log.h" #include "AmApi.h" #include "AmSession.h" #include "AmUtils.h" int JsonRpcServer::createRequest(const string& evq_link, const string& method, AmArg& params, JsonrpcNetstringsConnection* peer, const AmArg& udata, bool is_notification) { AmArg rpc_params; rpc_params["jsonrpc"] = "2.0"; rpc_params["method"] = method; rpc_params["params"] = params; if (!is_notification) { peer->req_id++; string req_id = int2str(peer->req_id); rpc_params["id"] = req_id; if (!evq_link.empty()) peer->replyReceivers[req_id] = make_pair(evq_link, udata); DBG("registering reply sink '%s' for id %s\n", evq_link.c_str(), req_id.c_str()); } string rpc_params_json = arg2json(rpc_params); if (rpc_params_json.length() > MAX_RPC_MSG_SIZE) { ERROR("internal error: message exceeded MAX_RPC_MSG_SIZE (%d)\n", MAX_RPC_MSG_SIZE); return -3; } DBG("RPC message: >>%.*s<<\n", (int)rpc_params_json.length(), rpc_params_json.c_str()); memcpy(peer->msgbuf, rpc_params_json.c_str(), rpc_params_json.length()); peer->msg_size = rpc_params_json.length(); // set peer connection up for sending peer->msg_recv = false; return 0; } int JsonRpcServer::createReply(JsonrpcNetstringsConnection* peer, const string& id, AmArg& result, bool is_error) { AmArg rpc_res; rpc_res["id"] = id; rpc_res["jsonrpc"] = "2.0"; if (is_error) rpc_res["error"] = result; else rpc_res["result"] = result; string res_s = arg2json(rpc_res); if (res_s.length() > MAX_RPC_MSG_SIZE) { ERROR("internal error: reply exceeded MAX_RPC_MSG_SIZE (%d)\n", MAX_RPC_MSG_SIZE); return -3; } DBG("created RPC reply: >>%.*s<<\n", (int)res_s.length(), res_s.c_str()); memcpy(peer->msgbuf, res_s.c_str(), res_s.length()); peer->msg_size = res_s.length(); return 0; } int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size, JsonrpcPeerConnection* peer) { DBG("parsing message ...\n"); // const char* txt = "{\"jsonrpc\": \"2.0\", \"result\": 19, \"id\": 1}"; AmArg rpc_params; if (!json2arg(msgbuf, rpc_params)) { INFO("Error parsing message '%.*s'\n", (int)*msg_size, msgbuf); return -1; } if (!rpc_params.hasMember("jsonrpc") || strcmp(rpc_params["jsonrpc"].asCStr(), "2.0")) { INFO("wrong json-rpc version received; only 2.0 supported!\n"); return -2; // todo: check value, reply with error? } bool is_request = (rpc_params.hasMember("method") && isArgCStr(rpc_params["method"])); if (!is_request) { // todo: move this to peer->processReply(rpc_params); // process reply if (!rpc_params.hasMember("id") || !isArgCStr(rpc_params["id"]) || rpc_params["id"] == "") { INFO("Error parsing jsonrpc message: no id in reply!\n"); return -2;// todo: check value, reply with error? } string id = rpc_params["id"].asCStr(); std::map<std::string, std::pair<std::string, AmArg > >::iterator rep_recv_q = peer->replyReceivers.find(id); if (rep_recv_q == peer->replyReceivers.end()) { DBG("received reply for unknown request"); *msg_size = 0; if (peer->flags && JsonrpcPeerConnection::FL_CLOSE_WRONG_REPLY) { INFO("closing connection after unknown reply id %s received\n", id.c_str()); return -2; } return 0; } const AmArg& udata = rep_recv_q->second.second; const string& event_queue_id = rep_recv_q->second.first; JsonRpcResponseEvent* resp_ev = NULL; if (rpc_params.hasMember("result")) { resp_ev = new JsonRpcResponseEvent(false, id, rpc_params["result"], udata); } else { if (!rpc_params.hasMember("error")) { INFO("protocol error: reply does not have error nor result!\n"); return -2; } resp_ev = new JsonRpcResponseEvent(true, id, rpc_params["error"], udata); } resp_ev->connection_id = peer->id; bool posted = AmEventDispatcher::instance()-> post(event_queue_id, resp_ev); if (!posted) { DBG("receiver event queue does not exist (any more)\n"); peer->replyReceivers.erase(rep_recv_q); *msg_size = 0; if (peer->flags & JsonrpcPeerConnection::FL_CLOSE_NO_REPLYLINK) { INFO("closing connection where reply link missing"); return -2; } return 0; } DBG("successfully posted reply to event queue\n"); peer->replyReceivers.erase(rep_recv_q); // don't send a reply *msg_size = 0; return 0; } string id; bool id_is_int = false; if (rpc_params.hasMember("id")) { if (isArgCStr(rpc_params["id"])) { id = rpc_params["id"].asCStr(); } else if (isArgInt(rpc_params["id"])) { id = int2str(rpc_params["id"].asInt()); id_is_int = true; } else { ERROR("incorrect type for jsonrpc id <%s>\n", AmArg::print(rpc_params["id"]).c_str()); } } else { DBG("received notification\n"); } // send directly to event queue if ((id.empty() && !peer->notificationReceiver.empty()) || (!id.empty() && !peer->requestReceiver.empty())) { // don't send a reply *msg_size = 0; string dst_evqueue = id.empty() ? peer->notificationReceiver.c_str() : peer->requestReceiver.c_str(); DBG("directly passing %s to event queue '%s'\n", id.empty() ? "notification":"request", dst_evqueue.c_str()); AmArg none_params; AmArg& params = none_params; if (rpc_params.hasMember("params")) { params = rpc_params["params"]; } JsonRpcRequestEvent* request_ev = new JsonRpcRequestEvent(rpc_params["method"].asCStr(), id, params); request_ev->connection_id = peer->id; bool posted = AmEventDispatcher::instance()-> post(dst_evqueue, request_ev); if (!posted) { DBG("%s receiver event queue '%s' does not exist (any more)\n", id.empty() ? "notification":"request", dst_evqueue.c_str()); delete request_ev; if (id.empty() && (peer->flags & JsonrpcPeerConnection::FL_CLOSE_NO_NOTIF_RECV)) { INFO("closing connection on missing notification receiver queue\n"); return -1; // todo: reply error? } if (!id.empty() && (peer->flags & JsonrpcPeerConnection::FL_CLOSE_NO_REQUEST_RECV)) { INFO("closing connection on missing request receiver queue\n"); return -1; // todo: reply error? } } else { DBG("successfully posted %s to event queue '%s'\n", id.empty() ? "notification":"request", dst_evqueue.c_str()); } return 0; } AmArg rpc_res; int int_id; execRpc(rpc_params, rpc_res); if (!id.empty()) { if (id_is_int) { str2int(id, int_id); rpc_res["id"] = int_id; } else { rpc_res["id"] = id; } } string res_s = arg2json(rpc_res); if (res_s.length() > MAX_RPC_MSG_SIZE) { ERROR("internal error: reply exceeded MAX_RPC_MSG_SIZE (%d)\n", MAX_RPC_MSG_SIZE); return -3; } DBG("RPC result: >>%.*s<<\n", (int)res_s.length(), res_s.c_str()); memcpy(msgbuf, res_s.c_str(), res_s.length()); *msg_size = res_s.length(); return 0; } /** rpc_params must contain "method" member as string */ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) { AmArg none_params; AmArg& params = none_params; if (rpc_params.hasMember("params")) { params = rpc_params["params"]; } string method = rpc_params["method"].asCStr(); string id; if (rpc_params.hasMember("id") && isArgCStr(rpc_params["id"])) id = rpc_params["id"].asCStr(); execRpc(method, id, params, rpc_res); } void JsonRpcServer::execRpc(const string& method, const string& id, const AmArg& params, AmArg& rpc_res) { try { size_t dot_pos = method.find('.'); if (dot_pos == string::npos || dot_pos == method.length()) { throw JsonRpcError(-32601, "Method not found", "use module.method as rpc method name"); } string factory = method.substr(0, method.find('.')); string fact_meth = method.substr(method.find('.')+1); try { if (factory == "core") { runCoreMethod(fact_meth, params, rpc_res["result"]); rpc_res["id"] = id; rpc_res["jsonrpc"] = "2.0"; return; } // todo: direct_export DBG("searching for factory '%s' method '%s'\n", factory.c_str(), fact_meth.c_str()); AmDynInvokeFactory* fact = AmPlugIn::instance()->getFactory4Di(factory); if (fact==NULL) { throw JsonRpcError(-32601, "Method not found", "module not loaded"); } AmDynInvoke* di_inst = fact->getInstance(); if(!di_inst) { throw JsonRpcError(-32601, "Method not found", "failed to instanciate module"); } di_inst->invoke(fact_meth, params, rpc_res["result"]); } catch (const AmDynInvoke::NotImplemented& ni) { INFO("not implemented DI function '%s'\n", ni.what.c_str()); throw JsonRpcError(-32601, "Method not found", "function unknown in module"); } catch (const AmArg::OutOfBoundsException& oob) { INFO("out of bounds in RPC DI call\n"); throw JsonRpcError(-32602, "Invalid params", "out of bounds in function call"); } catch (const AmArg::TypeMismatchException& oob) { INFO("type mismatch in RPC DI call\n"); throw JsonRpcError(-32602, "Invalid params", "parameters type mismatch in function call"); } catch (const JsonRpcError& e) { INFO("JsonRpcError \n"); throw; } catch (...) { ERROR("unexpected Exception in RPC DI call\n"); throw JsonRpcError(-32000, "Server error", "unexpected Exception"); } // todo: notification! rpc_res["id"] = id; rpc_res["jsonrpc"] = "2.0"; } catch (const JsonRpcError& e) { INFO("got JsonRpcError core %d message '%s'\n", e.code, e.message.c_str()); rpc_res["error"] = AmArg(); rpc_res["error"]["code"] = e.code; rpc_res["error"]["message"] = e.message; rpc_res["error"]["data"] = e.data; // todo: notification! rpc_res["id"] = id; rpc_res["jsonrpc"] = "2.0"; return; } rpc_res["error"] = AmArg(); // Undef/null } void JsonRpcServer::runCoreMethod(const string& method, const AmArg& params, AmArg& res) { if (method == "calls") { res[0] = (int)AmSession::getSessionNum(); } else if (method == "set_loglevel") { assertArgArray(params); assertArgInt(params[0]); log_level = params[0].asInt(); DBG("set log_level to %d\n", log_level); } else if (method == "get_loglevel") { res[0] = log_level; DBG("get_log_level returns %d\n", log_level); } else { throw JsonRpcError(-32601, "Method not found", "function unknown in core"); } }