본문 바로가기

Back-End

PM2를 사용해 PUSH서버 구축하기

728x90

INDEX

    Stack

    Nodejs,PM2,Javascript,CentOs

     

    Javascript

    Preview

    PUSH 서버는 사용자에게 보낼 메시지 정보를 받아 Google Firebase Cloud Messaging 로 전송하는 중간 브릿지 역할을 합니다.

    CentOs 위에서 Nodejs로 구현되어 있으며 무중단 배포 및 운영을 위해 PM2를 사용했습니다.

    What is PM2 ?

    PM2 is a daemon process manager that will help you manage and keep your application online. Getting started with PM2 is straightforward, it is offered as a simple and intuitive CLI, installable via NPM.

     

    Node.js 애플리케이션은 단일 CPU 코어에서 실행되기 때문에 CPU의 멀티코어 시스템은 사용할 수 없습니다. 만약 보유하고 있는 서버의 사양이 8 코어이며 하이퍼스레딩을 지원한다면 최대 16개 코어를 사용할 수 있는데요. 모든 코어를 사용해 최대 성능을 내지 못하고 오직 한 개의 코어만 사용해야 한다면 주어진 자원을 제대로 활용하지 못하는 꼴이 됩니다. Node.js는 이런 문제를 해결하기 위해 클러스터(Cluster) 모듈을 통해 단일 프로세스를 멀티 프로세스(Worker)로 늘릴 수 있는 방법을 제공합니다.  -Line Engineering 참조

     

    한마디로 PM2는 JS의 싱글 스레드 단점을 극복하고 멀티 프로세스를 운영함으로써 중단되지 않는 서버를 만들 수 있도록 도와주는 역할을 합니다. 멀티 프로세스가 돌아가고 있다면 한 프로세스가 죽더라도 다른 프로세스가 대체할 수 있기 때문이죠. 하지만 PM2로 운영을 하더라도 소스 자체에 문제가 있어 서버가 다운되는거라면 무중단 서비스는 불가합니다. 계속 프로세스를 내렸다가 올려도 해당 소스를 실행하게 되면 서버가 다운되기 때문입니다.

     

    이번 글에서는 PM2 를 사용하여 Nodejs 서버를 구축하는 과정을 간략하게 담아볼려고 합니다.

     

    Methodology

    Architecture

    AP(원장서버) -> FEP(관제서버) -> PUSH 서버 -> Firebase -> User

     

    고객 수천명 ~ 수만 명의 정보(TokenId)와 메시지를 담은 데이터는 증권사 내부의 서버에서 FEP서버로 전달이 되고 외부(인터넷)와의 통신이 가능한 PUSH 서버로 전달된 다음 Firebase, User에게 순차적으로 전달됩니다.

     

    TokenID는 고객이 사용하는 휴대폰의 고유 deviceID 입니다. MTS에 접속하여 알림 수신 동의를 하면 고객의 DeviceID를 추출해 서버로 보내어 저장해 둡니다.

     

    Code

    package.json을 아래와 같이 작성하여 npm run start를 실행하면 ecosystem.config.js 파일을 실행하도록 합니다.

    /* package.json */
    
    "scripts": { 
        "test": "echo \"Error: no test specified\" && exit 1",
        "start": "pm2 start ./ecosystem.config.js --watch",
        "list": "pm2 list",
        "stop": "pm2 stop all",
        "reload": "pm2 reload all"
      },
    /* ecosystem.config.js */
    
    module.exports = {
        apps: [{
            name: 'single01transmission',
            script: './bin/main.js',
            args: ["__IP__", "__PORT__"],
            instances: 1,
        },
        ....

    ecosystem.config.js 에는 각각의 인스턴스가 소켓 통신을 맺을 IP와 PORT를 arguments로 전달합니다. 

    *총 16개의 인스턴스(single 8, multi 8)를 가동합니다. 

    *PUSH는 고객 한 명 한 명에게 전달되는 단건(single) PUSH와 여러 고객에게 한 번에 전송되는 다건(multi) PUSH가 있기에 인스턴스를 분할합니다.

    /* main.js */
    const ServerModule = include(`Serverhttp`);
    //서버 시작
    FTRACE.log(__filename, "--------------------------------------");//로깅모듈에서 구현한 log함수를 사용해서 로깅처리
    FTRACE.log(__filename, `- Server Running...`);
    FTRACE.log(__filename, "--------------------------------------");

    ServerModule.runServer(process.argv[2], process.argv[3]); // 메인 실행 모듈에 서버 IP와 PORT를 인자로 넘겨준다

    웹서버 모듈

    관리자 페이지를 띄우기 위한 웹서버(express.js)와 http 통신을 위한 Socket 기능을 구현합니다.

    root 디렉터리 bin 폴더 안에 index.html을 비롯한 프런트 단 파일들을 위치합니다. 

    브라우저에서 서버 IP와 PORT로 접속을 하면 Index.html을 볼 수 있고 router는 Routes 경로의 index.js에서 구현했습니다.

    /* Serverhttp.js */
    const FTrace        = include('JTrace.js'); //로깅을 위한 모듈
    const NodeExpress   = require('express');
    const NodeApp       = NodeExpress();
    var http_server     = null;
    var webSocketServer = null;
    var httpWss         = null;
    const wsModule      = require("ws");

    const http_execute = (runFepServerFunc) => 
    {
        try{
            NodeApp.use(NodeExpress.static(__dirname + '/bin'));
            NodeApp.set('view engine', 'ejs');
            NodeApp.engine('html', require('ejs').renderFile);
            NodeApp.use(bodyParser.json());
            NodeApp.use(bodyParser.urlencoded({extended:true}));
            var indexRouter = require('./routes/index');
       
            NodeApp.use(indexRouter); //관리자 페이지 Router
       
            // catch 404 and forward to error handler
            NodeApp.use((req, res, next) =>{
              res.status(404).send('Not Found');
            });
       
            // 서버 실행
            http_server = NodeApp.listen(JConfig.http_port, function ()
            {
                FTrace.log(__filename, `listening at ${JConfig.http_port}`);
            });
            webSocketServer = new wsModule.Server(
                {
                    server : http_server
                }
            );
            webSocketServer.on("connection", (ws, request)=>{
                if(ws.readyState == ws.OPEN){
                    httpWss= ws;
                    httpWss.send("socket connected");
                    console.log(process.argv);
                }
            });
        }catch(err){
            FTrace.log(err);
        }finally{
            runFepServerFunc();
        }
    }

    *index.js 에서는 화면 js 파일에서 ajax로 접속해 통신이 가능하도록 api 소스를 작성했습니다. 

     

     

    FEP 통신을 위한 모듈

    FEP로부터는 버퍼(buffer) 데이터가 수신됩니다. 우선 소켓을 붙여야 합니다.

    const Net = require('net'); 
    const FService = require('./routes/service.js'); //데이터 처리를 위한 모듈
    const FTrace = include('JTrace.js'); //로깅을 위한 모듈

    //  fep
    const fep_execute = () => {
      console.log("FEP connection start");
      var tsocket = new Net.Socket();

      var packet_buffdata = null;
      connecttag(tsocket, function (result, err) {
        if (result) {
          packet_buffdata = FService.connectSend(JConfig.con);
          FService.sendw(tsocket, packet_buffdata);
          //FEP에는 bufferdata를 전송해야 하기 때문에
          //json 데이터를 buffer data 로 변환하는 모듈(FPacket.js)이 존재.
        } else {
          FTrace.log(err);
        }
      });
    }

    // socket
    var increase_length = 0, standard_length = 0;
    const connecttag = (tsocket, callback) =>
    {
        tsocket.connect({ host: JConfig.host, port : JConfig.port}, function() {
            callback(true, null); //connection 이 성공하면 FEP에 성공 시그널을 보냄
        });
        const nowTime = new Date().toISOString().slice(0,10)+" "+new Date().toLocaleTimeString('en-GB');
        tsocket.on('data', function(data) {
            // const insertquery = "Insert into sessionTBL values ('"+process.env.name + "', '1', '"+nowTime+
            // "') ON DUPLICATE KEY UPDATE Instance_Status='1', Connected_Time='"+nowTime+"';";
            // sendQuery(insertquery,(res)=>console.log(res));
            if(httpWss) {
                httpWss.send("socket listening"); //Http소켓에 데이터 Send(listening signal)
            }
            if(JConfig.log_check <= 1)
            {
                FTrace.log('FEP receive', data.length);this.pushLog = [];
               
                //FTRACE.log('datarecv중요', data.toString());
            }
            FService.onRecv_Data(data, JpacketTotal_length, Jcdata, increase_length, standard_length, tsocket,httpWss);
        });

        tsocket.on('end', function()
        {  
            FTrace.log('Client disconnected');
        });

        tsocket.on('error', function(err)
        {
            callback(false, JSON.stringify(err));
        });

        tsocket.on('timeout', function()
        {
            FTrace.log('Socket Timed Out');
        });

        tsocket.on('close', function()
        {
            FService.reconnect(tsocket, callback, connecttag);
        });
    }

    FEP Data To FCM

    FEP에서 받은 데이터는 메시지를 보낸 사용자들의 tokenID 값과 메시지 등의 내용이 담겨있습니다.

    이 데이터를 JSON 데이터로 변환하여 FCM(Firebase Cloud Messaging) 서버까지 전송을 해야 하는데 이 과정은 service.js 모듈과 FPacket.js 모듈에서 처리합니다.

    *Github 소스 참고. https://github.com/Shinsieon/Push-server-NodeJs/blob/main/README.md

    //정제된 데이터(pushData) 를 object 에 담아 FCM과 통신하는 모듈에 파라미터로 전달
    const pushsend = (pushData, callback)=> {
      var count, gun, pushcount;
      var errcode = FERROR.getError(pushData.Phead.P_ResponseCode);
      if (errcode.code !== null) {
        gun = "9";
      } else if (pushData.Phead.P_TransactionType === "D") {
        for (count = 0; count < pushData.Phead.P_RecordCount; count++) {
          var msgs = [];
          for (
            pushcount = 0; pushcount < pushData.Dhead[count].Dmsg.send_cnt;pushcount++) {
            var msg_push = {};
            msg_push = pushData.Dhead[count].Data[pushcount];
            if (JConfig.type_check === "0") msg_push["type"] = "1";
            msg_push["msg_title"] = pushData.Dhead[count].Dmsg.msg_title;
            msg_push["msg_contents"] = pushData.Dhead[count].Dmsg.msg_contents;
            var args = {
              push_grp_id: pushData.Dhead[count].Dmsg.push_grp_id,
              device_id: msg_push.device_id,
              csno: msg_push.csno,
              rcms_cnrp_name: msg_push.rcms_cnrp_name,
              type: msg_push.type,
              msg_title: msg_push.msg_title,
              msg_contents: msg_push.msg_contents,
            };
            msgs.push(args);
          }
          fcmServer.sendToFCM_Recursive(msgs, 0);
        }
        gun = "2";
      } else if (pushData.Phead.P_TransactionType === "A") {
        gun = "1";
      } else {
        gun = "0";
      }
      callback(pushData, gun);
    }

     

    Send Query To DataBase

    FEP로부터 데이터 수신, 정제를 한 뒤 고객고유번호(csno)를 primary Key로 해서 MariaDB에 insert 합니다. 이후 FCM으로부터 response를 받으면 Update 해줍니다.


    const pool = mariadb.createPool({
        host : JConfig.db_host,
        port : 3306,
        user : JConfig.db_user,
        password : JConfig.db_password,
        database : JConfig.db_database,
        connectionLimit : 5,
    });
    async function sendQuery(query,callback){
        let conn;
        try {
            conn = await fetchConn();
            var rows = await get_contacts(conn,query);
            if(callback) callback(rows);
        }catch (err){
            console.log(err);
        }finally{
            if(conn) conn.end();
        }
    }
    async function fetchConn(){
        let conn = await pool.getConnection();
        return conn;
    }
    async function get_contacts(conn,query){
        return await conn.query(query);
    }


    let query = "Insert into db_msg values ";
            result.Dhead[0].Data.map((item, idx) => {
              let values =
                "(NULL, '" +
                result.Dhead[0].Dmsg.sndn_rsev_hour +
                "','" +
                item.csno +
                "','" +
                item.cust_telno +
                "','" +
                result.Dhead[0].Dmsg.uuid_id +
                "','" +
                item.device_id +
                "','" +
                item.msg_title +
                "','" +
                item.msg_contents +
                "','" +
                result.Dhead[0].Dhead.D_servicename +
                "','" +
                result.Dhead[0].Dmsg.push_grp_id +
                "',NULL,'" +
                result.Dhead[0].Dmsg.sndn_rsev_hour +
                "',NULL)" +
                (idx == result.Dhead[0].Data.length - 1 ? ";" : ",");
              query += values;
            });
            server.sendQuery(query);

     

     

     

    Data To USER From FCM

    FcmPush.js 모듈은 FService에서 전달받은 데이터를 FCM에 보내고 성공/실패 값에 따라 DB를 업데이트해줍니다.


    const successFunc = (response, data) => { //성공 response를 받아 DB에 전송값(Y)로 업데이트
      const nowTime =
        new Date().toISOString().slice(0, 10) +
        " " +
        new Date().toLocaleTimeString("en-GB");
      const query =
        "Update db_msg SET fcm_send_yn = 'Y', fcm_sent_time = '" +
        nowTime +
        "' " +
        ", result= '" +
        response +
        "' WHERE push_grp_id = '" +
        data["push_grp_id"] +
        "' and csno= '" +
        data["csno"] +
        "';";
      server.sendQuery(query, (res) => console.log(res));
      if (JConfig.log_check <= 1)
        FTRACE.log(__filename, `Firebase Success: `, response);
    };

    const failFunc = (err, data) => {  //실패 response를 받아 DB에 전송값(N)로 업데이트
      const query =
        "Update db_msg SET fcm_send_yn = 'N', result = '" +
        JSON.stringify(err).substring(0, 200) +
        "' WHERE push_grp_id = '" +
        data["push_grp_id"] +
        "' and csno= '" +
        data["csno"] +
        "';";
      server.sendQuery(query, (res) => console.log(res));
      if (JConfig.log_check <= 1) FTRACE.log(__filename, `Firebase Error: `, err);
    };
    const sendToFCM_Recursive = (data, idx) => { //메시지를 순차적으로 보내기 위해 재귀함수 사용
      if (idx == data.length) return;
      let target_token = data[idx].device_id;
      delete data[idx]["device_id"];

      var message = {
        token: target_token,
        data: data[idx],
        apns: {
          payload: {
            aps: {
              contentAvailable: true,
              alert: {
                title: data[idx]["msg_title"],
                body: data[idx]["msg_contents"],
              },
            },
            badge: 1,
          },
        },
      };
      try {
        Admin.messaging()
          .send(message)
          .then(function (response) {
            successFunc(response, data[idx]);
            sendToFCM_Recursive(data, idx + 1);
          })
          .catch(function (err) {
            failFunc(err, data[idx]);
            sendToFCM_Recursive(data, idx + 1);
          });
      } catch (err) {
        console.log(err);
        return;
      }
    };

     


    Reference

    https://engineering.linecorp.com/ko/blog/pm2-nodejs/