/** * Copyright (C) 2015 Bicom Systems Ltd, (bicomsystems.com) * * Author: Seudin Kasumovic (seudin.kasumovic@gmail.com) * * This file is part of Kamailio, a free SIP server. * * Kamailio is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version * * Kamailio is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * */ #include <stdio.h> #include <stdarg.h> #include <string.h> #include <ei.h> #include "handle_emsg.h" #include "handle_rpc.h" #include "erl_helpers.h" #include "cnode.h" #include "pv_xbuff.h" #include "mod_erlang.h" #include "../../dprint.h" #include "../../sr_module.h" #include "../../cfg/cfg_struct.h" #include "../../fmsg.h" int handle_msg_req_tuple(cnode_handler_t *phandler, erlang_msg * msg); int handle_req_ref_tuple(cnode_handler_t *phandler, erlang_msg * msg); int handle_rpc_response(cnode_handler_t *phandler, erlang_msg * msg, int arity); int handle_rex_call(cnode_handler_t *phandler,erlang_ref_ex_t *ref, erlang_pid *pid); int handle_net_kernel(cnode_handler_t *phandler, erlang_msg * msg); void encode_error_msg(ei_x_buff *response, erlang_ref_ex_t *ref, const char *type, const char *msg, ...); int handle_reg_send(cnode_handler_t *phandler, erlang_msg * msg) { int rt, backup_rt; struct run_act_ctx ctx; sip_msg_t *fmsg; sip_msg_t tmsg; char *route; size_t sz; ei_x_buff *request = &phandler->request; sr_xavp_t *xreq = NULL; sr_xavp_t *xpid = NULL; str msg_name = str_init("msg"); str pid_name = str_init("pid"); sr_xval_t val; sr_data_t *data = NULL; sz = sizeof("erlang")+strlen(msg->toname)+2; route = (char*)pkg_malloc(sz); if (!route) { LM_ERR("not enough memory"); return -1; } snprintf(route,sz,"erlang:%s",msg->toname); rt = route_get(&event_rt, route); if (rt < 0 || event_rt.rlist[rt] == NULL) { LM_WARN("ERL_REG_SEND message to unknown process %s\n", route); pkg_free(route); PRINT_DBG_REG_SEND(phandler->conn.nodename, msg->from, phandler->ec.thisnodename, msg->toname,request); return 0; } LM_DBG("executing registered process %s\n", route); fmsg = faked_msg_next(); memcpy(&tmsg, fmsg, sizeof(sip_msg_t)); fmsg = &tmsg; if ((xreq = pv_xbuff_get_xbuff(&msg_name))) { LM_DBG("free previous $xbuff(msg) value\n"); xavp_destroy_list(&xreq->val.v.xavp); } else { xreq = xbuff_new(&msg_name); } if (!xreq) { LM_ERR("failed to create $xbuff(msg) variable\n"); goto err; } /* decode request into $xbuff(msg) */ xreq->val.type = SR_XTYPE_XAVP; /* XAVP <- ei_x_buff */ if (erl_api.xbuff2xavp(&xreq->val.v.xavp,request)){ LM_ERR("failed to decode message\n"); goto err; } if ((xpid = pv_xbuff_get_xbuff(&pid_name))) { LM_DBG("free previous $xbuff(pid) value\n"); xavp_destroy_list(&xpid->val.v.xavp); } else { xpid = xbuff_new(&pid_name); } if (!xpid) { LM_ERR("failed to create $xbuff(pid) variable\n"); goto err; } /* put erl_pid into $xbuff(pid) */ data = (sr_data_t*)shm_malloc(sizeof(sr_data_t)+sizeof(erlang_pid)); if (!data) { LM_ERR("not enough shared memory\n"); goto err; } memset((void*)data,0,sizeof(sr_data_t)+sizeof(erlang_pid)); data->p = (void*)data+sizeof(sr_data_t); data->pfree = xbuff_data_free; memcpy(data->p,(void*)&msg->from,sizeof(erlang_pid)); val.type = SR_XTYPE_DATA; val.v.data = data; xpid->val.v.xavp = xavp_new_value(&pid_name,&val); if (!xpid->val.v.xavp) { LM_ERR("failed to create xavp!\n"); goto err; } xpid->val.type = SR_XTYPE_XAVP; /* registered process reply to from */ cnode_reply_to_pid = &msg->from; backup_rt = get_route_type(); set_route_type(EVENT_ROUTE); init_run_actions_ctx(&ctx); run_top_route(event_rt.rlist[rt], fmsg, &ctx); set_route_type(backup_rt); pkg_free(route); free_xbuff_fmt_buff(); xavp_destroy_list(xavp_get_crt_list()); return 0; err: shm_free(data); pkg_free(route); free_xbuff_fmt_buff(); xavp_destroy_list(xavp_get_crt_list()); return -1; } /* * handle ERL_SEND */ int handle_send(cnode_handler_t *phandler, erlang_msg * msg) { int rt, backup_rt; struct run_act_ctx ctx; sip_msg_t *fmsg; sip_msg_t tmsg; char route[]="erlang:self"; ei_x_buff *request = &phandler->request; sr_xavp_t *xreq = NULL; str msg_name = str_init("msg"); rt = route_get(&event_rt, route); if (rt < 0 || event_rt.rlist[rt] == NULL) { LM_WARN("ERL_SEND message not handled, missing event route %s\n", route); PRINT_DBG_REG_SEND(phandler->conn.nodename, msg->from, phandler->ec.thisnodename, msg->toname,request); return 0; } LM_DBG("executing self process %s\n", route); fmsg = faked_msg_next(); memcpy(&tmsg, fmsg, sizeof(sip_msg_t)); fmsg = &tmsg; if ((xreq = pv_xbuff_get_xbuff(&msg_name))) { LM_DBG("free previous value\n"); xavp_destroy_list(&xreq->val.v.xavp); } else { xreq = xbuff_new(&msg_name); } if (!xreq) { LM_ERR("failed to create $xbuff(msg) variable\n"); goto err; } /* decode request into $xbuff(msg) */ xreq->val.type = SR_XTYPE_XAVP; /* XAVP <- ei_x_buff */ if (erl_api.xbuff2xavp(&xreq->val.v.xavp,request)){ LM_ERR("failed to decode message\n"); goto err; } backup_rt = get_route_type(); set_route_type(EVENT_ROUTE); init_run_actions_ctx(&ctx); run_top_route(event_rt.rlist[rt], fmsg, &ctx); set_route_type(backup_rt); free_xbuff_fmt_buff(); xavp_destroy_list(xavp_get_crt_list()); return 0; err: free_xbuff_fmt_buff(); xavp_destroy_list(xavp_get_crt_list()); return -1; } int handle_req_ref_tuple(cnode_handler_t *phandler, erlang_msg * msg) { erlang_ref ref; erlang_pid pid; int arity; ei_x_buff *request = &phandler->request; ei_x_buff *response = &phandler->response; ei_decode_tuple_header(request->buff, &request->index, &arity); if (ei_decode_ref(request->buff, &request->index, &ref)) { LM_WARN("Invalid reference.\n"); return -1; } if (ei_decode_pid(request->buff, &request->index, &pid)) { LM_ERR("Invalid pid in a reference/pid tuple\n"); return -1; } if (0) { ei_x_encode_atom(response, "ok"); } else { ei_x_encode_tuple_header(response, 2); ei_x_encode_atom(response, "error"); ei_x_encode_atom(response, "not_found"); } return -1; } /* catch the response to ei_rpc_to (which comes back as {rex, {Ref, Pid}} The {Ref,Pid} bit can be handled by handle_ref_tuple */ int handle_rpc_response(cnode_handler_t *phandler, erlang_msg * msg, int arity) { int type, size, arity2, tmpindex; ei_x_buff *request = &phandler->request; ei_get_type(request->buff, &request->index, &type, &size); switch (type) { case ERL_SMALL_TUPLE_EXT: case ERL_LARGE_TUPLE_EXT: tmpindex = request->index; ei_decode_tuple_header(request->buff, &tmpindex, &arity2); return handle_req_ref_tuple(phandler, msg); default: LM_ERR("Unknown RPC response.\n"); break; } /* no reply */ return -1; } int handle_msg_req_tuple(cnode_handler_t *phandler, erlang_msg * msg) { char tupletag[MAXATOMLEN]; int arity; int ret = 0; ei_x_buff *request = &phandler->request; ei_decode_tuple_header(request->buff, &request->index, &arity); if (ei_decode_atom(request->buff, &request->index, tupletag)) { LM_ERR("error: badarg\n"); } else { if (!strncmp(tupletag, "rex", MAXATOMLEN)) { ret = handle_rpc_response(phandler, msg, arity); } else { LM_ERR("error: undef\n"); } } return ret; } /* respond on net_adm:ping * e.g. message: * * {'$gen_call', {<tbe1@tbe.lan.343.0>, #Ref<194674.122.0>}, {is_auth, 'tbe1@tbe.lan'}} for net_kernel */ int handle_net_kernel(cnode_handler_t *phandler, erlang_msg * msg) { int version, size, type, arity; char atom[MAXATOMLEN]; erlang_ref ref; erlang_pid pid; ei_x_buff *request = &phandler->request; ei_x_buff *response = &phandler->response; /* start from first arg */ request->index = 0; ei_decode_version(request->buff, &request->index, &version); ei_get_type(request->buff, &request->index, &type, &size); if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) { LM_ERR("not a tuple\n"); return -1; } ei_decode_tuple_header(request->buff, &request->index, &arity); if (arity != 3) { LM_ERR("wrong arity\n"); return -1; } if (ei_decode_atom(request->buff, &request->index, atom) || strncmp(atom, "$gen_call", MAXATOMLEN)) { LM_ERR("not atom '$gen_call'\n"); return -1; } ei_get_type(request->buff, &request->index, &type, &size); if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) { LM_ERR("not a tuple\n"); return -1; } ei_decode_tuple_header(request->buff, &request->index, &arity); if (arity != 2) { LM_ERR("wrong arity\n"); return -1; } if (ei_decode_pid(request->buff, &request->index, &pid) || ei_decode_ref(request->buff, &request->index, &ref)) { LM_ERR("decoding pid and ref error\n"); return -1; } ei_get_type(request->buff, &request->index, &type, &size); if (type != ERL_SMALL_TUPLE_EXT) { LM_ERR("not a tuple\n"); return -1; } ei_decode_tuple_header(request->buff, &request->index, &arity); if (arity != 2) { LM_ERR("bad arity\n"); return -1; } if (ei_decode_atom(request->buff, &request->index, atom) || strncmp(atom, "is_auth", MAXATOMLEN)) { LM_ERR("not is_auth\n"); return -1; } /* To ! {Tag, Reply} */ ei_x_encode_tuple_header(response, 2); ei_x_encode_ref(response, &ref); ei_x_encode_atom(response, "yes"); ei_x_print_msg(response, &pid, 1); ei_send_tmo(phandler->sockfd, &pid, response->buff, response->index, CONNECT_TIMEOUT); return -1; } int erlang_whereis(cnode_handler_t *phandler,erlang_ref_ex_t *ref, erlang_pid *pid) { ei_x_buff *response = &phandler->response; ei_x_buff *request = &phandler->request; char route[sizeof("erlang:")+MAXATOMLEN] = "erlang:"; int arity; int type; int rt; ei_decode_list_header(request->buff,&request->index,&arity); if (arity != 1) { response->index = 1; encode_error_msg(response, ref, "badarith", "undefined function erlang:whereis/%d",arity); return 0; } ei_get_type(request->buff,&request->index,&type,&arity); if (type != ERL_ATOM_EXT) { response->index = 1; encode_error_msg(response, ref, "badarg", "bad argument"); return 0; } ei_decode_atom(request->buff,&request->index,route+sizeof("erlang:")-1); rt = route_get(&event_rt, route); if (rt < 0 || event_rt.rlist[rt] == NULL) { LM_WARN("can't find pseudo process %s\n", route); ei_x_encode_atom(response,"undefined"); return 0; } ei_x_encode_pid(response,&phandler->ec.self); return 0; } static int handle_erlang_calls(cnode_handler_t *phandler,erlang_ref_ex_t *ref, erlang_pid *pid, const char *method) { ei_x_buff *response = &phandler->response; if (strcmp(method,"whereis")==0) { return erlang_whereis(phandler,ref,pid); } else { response->index = 1; encode_error_msg(response, ref, "badrpc", "Method Not Found"); } return 0; } /* handle rex calls * * example: * * {call, tbe, dlg_bye, [123, 456], <tbe1@tbe.lan.31.0>} * */ int handle_rex_call(cnode_handler_t *phandler,erlang_ref_ex_t *ref, erlang_pid *pid) { char module[MAXATOMLEN]; char method[MAXATOMLEN]; char proc[2*MAXATOMLEN]; erl_rpc_ctx_t ctx; rpc_export_t* exp; int arity; ei_x_buff *request = &phandler->request; ei_x_buff *response = &phandler->response; int size, type; /* already decoded {call, * continue with * module,method...} */ ei_get_type(request->buff,&request->index,&type,&size); #ifdef ERL_SMALL_ATOM_EXT if (type == ERL_ATOM_EXT || type == ERL_SMALL_ATOM_EXT) { #else if (type == ERL_ATOM_EXT) { #endif if (ei_decode_atom(request->buff,&request->index,module)) { encode_error_msg(response, ref, "error", "Failed to decode module name"); return 0; } } else if (ei_decode_strorbin(request->buff, &request->index, MAXATOMLEN, module)) { encode_error_msg(response, ref, "error", "Failed to decode module name"); return 0; } ei_get_type(request->buff,&request->index,&type,&size); #ifdef ERL_SMALL_ATOM_EXT if (type == ERL_ATOM_EXT || type == ERL_SMALL_ATOM_EXT) { #else if (type == ERL_ATOM_EXT) { #endif if (ei_decode_atom(request->buff,&request->index,method)) { encode_error_msg(response, ref, "error", "Failed to decode method name"); return 0; } } else if (ei_decode_strorbin(request->buff, &request->index, MAXATOMLEN, method)) { encode_error_msg(response, ref, "error", "Failed to decode method name"); return 0; } if (strcmp(module,"erlang") == 0) { /* start encoding */ ei_x_encode_tuple_header(response, 2); if (ref->with_node) { ei_x_encode_tuple_header(response,2); ei_x_encode_ref(response, &ref->ref); ei_x_encode_atom(response,ref->nodename); } else { ei_x_encode_ref(response, &ref->ref); } return handle_erlang_calls(phandler,ref,pid,method); } /* be up to date with cfg */ cfg_update(); sprintf(proc,"%s.%s",module,method); exp=find_rpc_export(proc,0); if (!exp || !exp->function) { encode_error_msg(response, ref, "badrpc", "Method Not Found"); return 0; } ei_get_type(request->buff,&request->index,&type,&size); /* open list for decoding */ if (ei_decode_list_header(request->buff,&request->index,&arity)) { LOG(L_ERR, "Expected list of parameters type=<%c> arity=<%d>\n", type, size); encode_error_msg(response, ref, "badarith", "Expected list of parameters."); return 0; } /* start encoding */ ei_x_encode_tuple_header(response, 2); if (ref->with_node) { ei_x_encode_tuple_header(response,2); ei_x_encode_ref(response, &ref->ref); ei_x_encode_atom(response,ref->nodename); } else { ei_x_encode_ref(response, &ref->ref); } /* init context */ ctx.phandler = phandler; ctx.pid = pid; ctx.ref = ref; ctx.response_sent = 0; ctx.request = request; ctx.request_index = request->index; ctx.response = response; ctx.reply_params = 0; ctx.tail = 0; ctx.fault = 0; ctx.fault_p = &ctx.fault; ctx.optional = 0; ctx.no_params = 0; ctx.response_index = response->index; ctx.size = arity; /* call rpc */ rex_call_in_progress = 1; exp->function(&erl_rpc_func_param,(void*)&ctx); rex_call_in_progress = 0; if (ctx.no_params) { ei_x_encode_list_header(response,ctx.no_params); } if (erl_rpc_send(&ctx, 0)) { response->index = ctx.response_index; ei_x_encode_atom(response, "error"); ei_x_encode_tuple_header(response,2); ei_x_encode_string(response, "Inernal Error: Failed to encode reply"); } else { ei_x_encode_empty_list(response); } empty_recycle_bin(); /* we sent response so it's false for up calls */ return 0; } /* {'$gen_call', {<tbe1@tbe.lan.14488.1>, #Ref<62147.65.0>}, {call, tbe, dlg_bye, [123, 456], <tbe1@tbe.lan.31.0>}} for rex */ int handle_rex_msg(cnode_handler_t *phandler, erlang_msg * msg) { int version, size, type, arity; char atom[MAXATOMLEN]; erlang_ref_ex_t ref; erlang_pid pid; ei_x_buff *request = &phandler->request; ei_x_buff *response = &phandler->response; /* start from first arg */ request->index = 0; ei_decode_version(request->buff, &request->index, &version); ei_get_type(request->buff, &request->index, &type, &size); if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) { LM_ERR("not a tuple\n"); return -1; } ei_decode_tuple_header(request->buff, &request->index, &arity); if (arity != 3) { LM_ERR("wrong arity %d\n", arity); return -1; } if (ei_decode_atom(request->buff, &request->index, atom) || strncmp(atom, "$gen_call", MAXATOMLEN)) { LM_ERR("not $gen_call\n"); return -1; } ei_get_type(request->buff, &request->index, &type, &size); if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) { LM_ERR("not a tuple\n"); goto err; } ei_decode_tuple_header(request->buff, &request->index, &arity); if (arity != 2) { LM_ERR("wrong arity\n"); goto err; } if (ei_decode_pid(request->buff, &request->index, &pid) ) { LM_ERR("decoding pid error\n"); goto err2; } ref.with_node = 0; /* we can got with host accompanied {#Ref<18224.110.0>, 'reg-two@pbx.lan'} */ ei_get_type(request->buff, &request->index, &type, &size); if (type == ERL_REFERENCE_EXT || type == ERL_NEW_REFERENCE_EXT) { if (ei_decode_ref(request->buff, &request->index, &ref.ref)) { LM_ERR("decoding ref error\n"); goto err2; } } else if (type != ERL_SMALL_TUPLE_EXT) { LM_ERR("not a tuple type {#Ref<x.y.z>, 'node@host.tld'} %c\n", type); goto err; } else { if (ei_decode_tuple_header(request->buff, &request->index, &arity) || arity !=2) goto err2; if (ei_decode_ref(request->buff, &request->index, &ref.ref)) { LM_ERR("decoding ref error\n"); goto err2; } if (ei_decode_atom(request->buff, &request->index, ref.nodename)) { LM_ERR("decoding node in ref error\n"); goto err2; } ref.with_node = 1; } ei_get_type(request->buff, &request->index, &type, &size); if (type != ERL_SMALL_TUPLE_EXT) { LM_ERR("not a tuple\n"); goto err; } ei_decode_tuple_header(request->buff, &request->index, &arity); if (arity != 5) { LM_ERR("bad arity %d\n", arity); goto err; } if (ei_decode_atom(request->buff, &request->index, atom) == 0 && strncmp(atom, "call", MAXATOMLEN) == 0) { return handle_rex_call(phandler, &ref, &pid); } err: /* To ! {Tag, Reply} */ ei_x_encode_tuple_header(response, 2); if (ref.with_node) { ei_x_encode_tuple_header(response,2); ei_x_encode_ref(response, &ref.ref); ei_x_encode_atom(response,ref.nodename); } else { ei_x_encode_ref(response, &ref.ref); } ei_x_encode_tuple_header(response,2); ei_x_encode_atom(response, "badrpc"); ei_x_encode_string(response, "Unsupported rex request."); ei_x_print_msg(response, &pid, 1); ei_send_tmo(phandler->sockfd, &pid, response->buff, response->index, CONNECT_TIMEOUT); err2: return -1; } int handle_erlang_msg(cnode_handler_t *phandler, erlang_msg * msg) { int type, type2, size, version, arity, tmpindex; int ret = 0; ei_x_buff * request = &phandler->request; ei_x_buff * response = &phandler->response; erlang_pid from; if (msg->msgtype == ERL_REG_SEND ) { cnode_reply_to_pid = &msg->from; if (!strncmp(msg->toname, "net_kernel",MAXATOMLEN)) { /* respond to ping stuff */ ret = handle_net_kernel(phandler, msg); } else if (!strncmp(msg->toname, "rex",MAXATOMLEN)) { /* respond to rex stuff */ ret = handle_rex_msg(phandler, msg); } else { /* try registered process */ handle_reg_send(phandler,msg); } } else if (msg->msgtype == ERL_SEND) { ret = handle_send(phandler, msg); } else { /* TODO: fix below after adding #Pid and #Ref in PVs */ request->index = 0; ei_decode_version(request->buff, &request->index, &version); ei_get_type(request->buff, &request->index, &type, &size); switch (type) { case ERL_SMALL_TUPLE_EXT: case ERL_LARGE_TUPLE_EXT: tmpindex = request->index; ei_decode_tuple_header(request->buff, &tmpindex, &arity); ei_get_type(request->buff, &tmpindex, &type2, &size); switch (type2) { case ERL_ATOM_EXT: ret = handle_msg_req_tuple(phandler, msg); break; case ERL_REFERENCE_EXT: case ERL_NEW_REFERENCE_EXT: ret = handle_req_ref_tuple(phandler, msg); break; case ERL_PID_EXT: ei_decode_pid(request->buff,&tmpindex,&from); ret = handle_send(phandler, msg); break; default: LM_ERR("nothing to do with term type=<%d> type2=<%d> -- discarding\n", type, type2); break; } break; default: LM_ERR("not handled term type=<%d> size=<%d> -- discarding\n", type, size); break; } } if (ret) { /* reset pid */ cnode_reply_to_pid = NULL; return ret; } else if (response->index > 1 && cnode_reply_to_pid) { ei_x_print_msg(response, cnode_reply_to_pid, 1); if (ei_send(phandler->sockfd, cnode_reply_to_pid, response->buff, response->index)) { LM_ERR("ei_send failed on node=<%s> socket=<%d>, %s\n", phandler->ec.thisnodename,phandler->sockfd, strerror(erl_errno)); } } else { LM_DBG("** no reply **\n"); } /* reset pid */ cnode_reply_to_pid = NULL; return 0; } void encode_error_msg(ei_x_buff *response, erlang_ref_ex_t *ref, const char *type, const char *msg, ... ) { char buffer[256]; va_list args; va_start (args, msg); vsnprintf (buffer, 255, msg, args); va_end (args); ei_x_encode_tuple_header(response, 2); if (ref->with_node) { ei_x_encode_tuple_header(response,2); ei_x_encode_ref(response, &ref->ref); ei_x_encode_atom(response,ref->nodename); } else { ei_x_encode_ref(response, &ref->ref); } ei_x_encode_tuple_header(response,2); ei_x_encode_atom(response, type); ei_x_encode_string(response, buffer); }