ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Node.js] Stream과 Back Pressure
    TIL 2024. 10. 24. 19:02

     
    학습 키워드: Node.js, Stream, Back Pressure, Socket
     

    1. Stream

    1) What is it?:

     연속적인 데이터 흐름을 처리하는 방식으로, 한 번에 모든 데이터를 메모리에 올리지 않고, 작은 단위의 청크(Chunk)로 나누어서 처리하여 메모리 사용을 최소화 함으로써 성능 최적화를 달성하는 방법이다. Node.js에서 스트림은 크게 다음의 네 가지로 나뉜다.

    1. Readable Stream: 데이터를 읽을 수 있는 스트림 (File System의 read stream이나 HTTP 요청). 
    2. Writable Stream: 데이터를 쓸 수 있는 스트림 (File System의 write stream이나 HTTP 응답)
    3. Duplex Stream: 읽기와 쓰기가 모두 가능한 스트림 (TCP 소켓)
    4. Transform Stream: 압축이나 암호화 등 데이터를 변환하면서 읽고 쓰는 것이 가능한 스트림

     모든 스트림에서 공통적으로 주의해야 하는 점으로 백프레셔(Back Pressure) 관리가 있다. 백프레셔란 데이터 스트림에서 읽기/쓰기 속도의 차이로 인해 메모리 버퍼가 가득 차 시스템 성능 저하와 메모리 사용량 급증 등의 문제가 발생하는 것을 말한다. 네트워킹으로 치면 흐름 제어(flow control)가 필요한 상황이다.
     
     

    2) Readable Stream:

     Readable Stream은 대용량의 데이터를 청크 단위로 나누어 읽음으로써 메모리 사용을 최소화하고 데이터 처리 효율을 높일 수 있다. Node.js에서의 예시로는 파일 시스템(fs)의 createReadStream()를 통해 파일을 스트리밍 할 수 있으며, 이렇게 열린 스트림에 .on()으로 data, end 및 error 등의 이벤트와 콜백 함수를 등록하여 청크를 처리한다. 데이터 이벤트의 경우 콜백의 인자로 chunk를 받게되며, 에러 이벤트는 error를 받는다. 이전 프로젝트들에서는 주로 Express를 사용했어서 사용할 일이 없었지만, 기본 모듈 http를 이용하면 http 서버를 생성하고, 마찬가지로 .on을 통해 이벤트를 등록할 수 있다.
     

    const fs = require('fs');
    
    // 파일을 읽기 스트림으로 처리
    const readableStream = fs.createReadStream('example.txt', { encoding: 'utf8' });
    
    // data 이벤트로 읽은 데이터를 처리
    readableStream.on('data', (chunk) => {
        console.log('청크 데이터:', chunk);
    });
    
    // end 이벤트로 읽기 완료 확인
    readableStream.on('end', () => {
        console.log('파일 읽기가 완료되었습니다.');
    });
    
    // 에러 처리
    readableStream.on('error', (err) => {
        console.error('읽기 중 에러 발생:', err);
    });

     
     데이터를 너무 빠르게 읽어오면 앞서 설명한 백프레셔 문제가 발생할 수 있기 때문에, 읽기 속도를 적절하게 제어할 필요가 있으며, 이러한 상황에서 readable 이벤트가 유용하게 사용될 수 있다. 기본적으로 data 이벤트는 읽기가 준비되면 자동으로 읽어서 콜백의 인자로 데이터 청크를 넘겨주지만, readable 이벤트를 사용하면 데이터 읽기 준비 상태가 되었을 때 콜백이 발생하고 콜백 내부에서 읽기를 직접 구현할 수 있다.
     

    // 'readable' 이벤트 리스너
    readableStream.on('readable', () => {
      console.log('읽을 수 있는 데이터가 준비되었습니다.');
      let chunk;
      while (null !== (chunk = readableStream.read())) {
        console.log(`읽은 데이터: ${chunk}`);
      }
    });

     
     스트림 자체를 직접 중단/재개하는 pause/resume 메서드도 있는데, pause로 스트림을 중단하면 data 이벤트가 발생하지 않게 되며, resume은 pause 상태를 해제한다.
     
     

    3) Writable Stream:

     Node.js의 Writable Stream은 파일에 데이터를 쓰거나 HTTP 응답을 전송할 때 사용한다. 스트림은 비동기로 작동하며, 데이터가 준비되면 즉시 쓰기를 시작한다.

     

    const fs = require('fs');
    
    // 파일 쓰기 스트림 생성
    const writableStream = fs.createWriteStream('output.txt');
    
    // 스트림에 데이터 쓰기
    writableStream.write('첫 번째 줄입니다.\n');
    writableStream.write('두 번째 줄입니다.\n');
    
    // 만약 스트림이 처리할 수 있는 용량을 초과하면, false를 반환
    if (!writableStream.write('세 번째 줄입니다.\n')) {
        // 처리할 수 없는 경우, 'drain' 이벤트로 스트림이 다시 비워졌을 때 쓰기 재개
        writableStream.once('drain', () => {
            console.log('스트림이 비워졌습니다. 다시 데이터를 쓸 수 있습니다.');
            writableStream.write('네 번째 줄입니다.\n');
        });
    }
    
    // 모든 데이터 쓰기가 완료되었을 때 스트림 종료
    writableStream.end('마지막 줄입니다.\n');
    
    // 스트림이 끝났을 때 'finish' 이벤트 발생
    writableStream.on('finish', () => {
        console.log('모든 데이터가 파일에 기록되었습니다.');
    });
    
    // 에러 처리
    writableStream.on('error', (err) => {
        console.error('파일 쓰기 중 오류 발생:', err);
    });

     
     쓰기 스트림 역시 데이터를 너무 빠르게 쓰게 되면 백프레셔가 발생할 수 있다. 이를 방지하기 위해 drain 이벤트를 사용할 수 있다. 쓰기를 담당하는 write 메서드는 버퍼 쓰기 성공 여부에 따라 bool을 반환하는데, false를 반환한 경우는 버퍼가 가득 차있는 백프레셔가 발생한 상태를 의미한다. 따라서 false 시 쓰기를 중단하고, writable.once를 사용해 drain 이벤트 발생 시 재개하도록 코드를 작성하여 메모리 문제로 시스템 상태가 불안정해지는 것을 방지할 수 있다.
     
     

    4) Duplex Stream:

     Duplex Stream은 읽기와 쓰기가 모두 가능한 형태의 스트림이다. 이전에 진행했던 프로젝트들에서 사용한 net 모듈의 TCP 소켓같은 양방향 통신에 이 기능이 포함된다. Duplex stream은 데이터를 읽고 쓰는 기능이 있는 추상화된 스트림이며, 네트워크 통신은 duplex 스트림을 상속받은 net socket의 기능이다. 아래의 예시에서 socket.on을 통한 이벤트 등록으로 읽기, socket.write를 통한 쓰기를 모두 진행하는 것을 볼 수 있다.

     

    const net = require('net');
    
    // TCP 서버 생성
    const server = net.createServer((socket) => {
        // socket은 Duplex Stream을 상속받은 객체임
        socket.on('data', (data) => {
            console.log('클라이언트로부터 받은 데이터:', data.toString());
            socket.write('서버에서 응답합니다.');
        });
    
        socket.on('end', () => {
            console.log('클라이언트 연결 종료');
        });
    });
    
    // 서버 실행
    server.listen(3000, () => {
        console.log('서버가 3000번 포트에서 실행 중입니다.');
    });

     
     Duplex stream에서는 읽기와 쓰기 스트림이 독립적으로 동작한다. 쉽게 말하면 읽기 스트림이 동작하는 동안 쓰기 스트림이 동작하는 것이 가능하다는 뜻이다. 이는 적절한 백프레셔 관리가 없다면 한 쪽 스트림의 처리 속도가 늦어지는 문제를 일으킬 수 있다. 예를 들어 쓰기 속도가 너무 빠르다면 쓰기에 메모리를 사용하느라 읽기에 사용할 메모리가 부족하여 처리가 지연될 수 있는 것이다. 앞서 readable/writable stream에서 언급한 백프레셔 관리 방법들을 적절하게 적용하는 것이 중요하다.
     

    // Duplex Stream 구현 예시
    const { Duplex } = require('stream');
    const duplex = new Duplex({
      read(size) {
        this.push('읽기 데이터');
        this.push(null);
      },
      write(chunk, encoding, callback) {
        console.log(`쓰기 데이터: ${chunk}`);
        callback();
      }
    });
    duplex.write('쓰기 요청');
    duplex.on('data', (chunk) => console.log(chunk.toString()));

     
     

    5) Transform Stream:

     마찬가지로 읽기와 쓰기가 가능한 스트림인데, 데이터 변환 로직이 추가된다는 차이점이 있다. 앞선 스트림들과 다르게 read와 write 대신 transform과 flush를 구현하게 되는데, transform은 청크를 받았을 때 청크 데이터를 다른 내용으로 변환하는 로직(영문 대소문자 변환, 내용물 필터링 등)을 수행하고 결과를 콜백으로 반환한다. 모든 데이터의 수신이 끝났을 때 잔여 데이터를 처리하기 위해 flush 메서드가 사용된다.
     

    const { Transform } = require('stream');
    const upperCase = new Transform({
      transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
      }
    });
    process.stdin.pipe(upperCase).pipe(process.stdout);

     
     앞에서 언급하지 않았지만, 각 스트림에 있는 pipe() 메서드를 사용하면 스트림을 다른 스트림과 연계하여 데이터를 전달할 수 있다. 예를 들어, 읽기 스트림에서 얻은 결과를 pipe하여 쓰기 스트림에 전달하면 읽은 내용을 다른 곳에 쓸 수 있게 된다.
     
     

    6) Summary:

     Stream은 대용량 데이터를 작은 청크 단위로 나누어 처리하여 메모리 사용을 절약하고, 실시간 데이터를 효율적으로 처리할 수 있도록 도와준다. 데이터 소비가 생산을 따라가지 못하는 경우 메모리 문제가 발생할 수 있으며, 이를 백프레셔라고 한다. 읽기 스트림을 중단/재개하거나 쓰기 스트림의 작성 빈도를 조절하는 등 적절한 백프레셔 관리를 통해 시스템 성능 저하를 미연에 방지할 수 있다.
     
     
     
    --
     
    REFERENCES:
     
    https://nodejs.org/api/stream.html
     > Node.js Docs
     
    https://nodejs.org/pt/blog/feature/streams2
     > Node.js 블로그 (구 버전 노트)

    728x90

    'TIL' 카테고리의 다른 글

    Express.js와 라우팅, 그리고 미들웨어  (0) 2024.10.14
Designed by Tistory.