우리는 어떻게 해외주식 서비스 안정화를 이뤘는가

김광훈 · 토스증권 Server Developer
2024년 7월 16일

토스증권 해외주식 서비스 소개 ✌️

안녕하세요, 토스증권 Server Developer 김광훈입니다. 제가 근무하고 있는 해외주식 플랫폼 팀은 미국 주식을 중심으로 해외 주식 원장을 담당하고 있어요. 원장이란 증권 서비스에서 가장 주요한 영역 중 하나이며, 금융거래를 기록하는 장부를 말해요. 저희 팀에서는 고객의 주문, 자산, 권리 그리고 종목 정보 관리와 환전까지 해외주식 서비스 제공에 필요한 모든 거래와 정보들을 원장에 기록하는 개발과 운영을 하고 있어요.

이번 글에서는 저희 팀이 외부 브로커와 통신하고 있는 해외 주식 서비스를 안전하게 운영하기 위해 고민했던 내용을 공유하려고 합니다.

토스증권 미국 주식 매매 아키텍처

먼저, 미국 주식 매매 아키텍처를 같이 살펴볼게요. 사용자가 토스증권에서 매매 요청을 하면 현지 브로커로 요청을 보내고 브로커는 현지 미국 거래소에서 매매를 체결하고 응답을 보내는 구조입니다. 브로커라는 용어가 생소할 수 있는데요, 이름 그대로 주식 매매를 현지에서 처리를 해주는 역할을 하는 회사에요. 미국에서 직구를 할 때 중간에 상품을 현지에서 구매하고 한국으로 배송해주는 대행사와 비슷한 것이죠.

브로커와 통신은 HTTP, FIX 두 가지 프로토콜을 사용하고 있어요. 메인으로는 FIX 프로토콜을 사용하고 있고, 브로커 요구사항에 따라 HTTP 를 부분적으로 사용하여 요청 보내고 있어요. 요청에 대한 응답은 최종적으로 Kafka나 SQS와 같은 큐를 사용하여 비동기로 처리하고 있어요.

문제 원인 파악 😢

그럼 이제 토스증권에서 브로커와 통신하는 과정에서 어떤 문제가 왜 일어났는지 설명드릴게요. 아래 그림은 브로커 이슈로 문제가 된 상황 당시의 핀포인트 그래프입니다. 그래프를 보면 브로커 응답 시간이 조금씩 지연이 발생하였고, 지연된 주문들은 계속 대기하면서 쌓이게 되고 결국에는 5000ms 이상 응답이 걸리게 되면서 이슈가 발생한 것을 볼 수 있어요.

문제의 원인은 크게 두 가지였어요.

먼저 정규장 시작에 크게 뛰는 트래픽이었는데요. 아래 그래프는 주문 API 호출 건 수를 시간대별로 나타내고 있어요. 미국의 정규장 시작(22시30분)부터 호출 건 수가 급격히 증가하는 것을 볼 수 있어요. 해외주식 서비스 특성상 정규장 초반에 트래픽이 급격히 증가하고 최소 두 시간 정도는 지속돼요. TPS 를 정규장 시간과 아닌 시간을 비교해보면 20배 이상 차이가 나기에, 대부분의 운영 이슈는 정규장 오픈 초반에 발생할 수밖에 없어요.

토스증권 사용자가 늘어나는 것도 문제의 원인이었어요.해외주식 주문 증가 추세를 보기 위해 주문 접수 API 요청을 월 별로 집계를 해보았어요. 아래 그래프를 보면 계속 주문 수는 증가하고 있는 추세를 볼 수 있어요. 오픈 초기 주문 요청량 대비 현재 요청량을 비교해보면 약 30배 넘게 증가한 것을 볼 수 있었습니다.

서비스가 오픈하고 폭발적인 성장을 하게 되었고, 동시에 브로커도 함께 처리해야 하는 주문도 증가했어요. 그로 인하여 보이지 않았던 이슈들이 점차 수면 위로 올라왔고, 크고 작은 이슈들을 겪게 되었습니다.

브로커와 통신 과정에서 이슈를 겪고 나서 팀에서는 통신 구간에 엔지니어링을 시작을 했고, 가장 먼저 브로커가 수용할 수 있는 최대 TPS 이상 보내지 않도록 트래픽 제어를 했어요. 트래픽을 조절해서 보냈음에도 브로커 측에서 발생한 문제를 빠르게 감지하고 다른 브로커로 주문 요청을 보낼 수 있는 시스템을 만들어야 했어요. 시스템을 어떻게 만들었는지 설명드릴게요.

1. 트래픽 제어

주식을 예약 주문해보신 경험이 있나요? 해외 주식도 예약 주문을 받는데요. 토스증권은 예약 주문을 들고 있다가, 실제 주문 요청은 정규장이 시작된 뒤에 배치로 보내고 있어요. 정규장 시작 후 대략 100만 건 이상의 예약 주문을 브로커로 보내게 되는데, 한 번에 보내면 브로커에 부하를 줄 수 있어요.

그래서 브로커로 보내는 주문 요청 TPS를 제어를 할 필요가 생겼고, 이미 잘 만들어져 있는 resilience4j를 사용하여 간단하게 문제를 해결했어요. resilience4j 는 fault tolerance(내결함성)의 목적으로 나왔기 때문에 서킷 브레이커, 트래픽 제어, 재시도 등 분산 시스템 안정성과 탄력성을 높이기 위한 기능들을 지원해요.

class BatchJob {

    lateinit var rateLimiter: RateLimiter
    
    companion object {
        private const val JOB_CODE = "TradePreparedOrderExecutorJob"
    }
    
    fun run(context: RunContext): BatchResponse {
        val tps = arguments[2].toInt()
        rateLimiter = RateLimiterCreator.of(
		        limitRefreshPeriod = Duration.ofSeconds(1), 
		        limitForPeriod = tps, 
		        name = JOB_CODE
	      )
	      
	      rateLimiter.executeRunnable {
            preparedOrderBrokerRequestFacade.sendOrder()
        }
    }
}

위 코드는 예약 주문 배치 코드 예시입니다. 파라미터로 TPS를 받고, rateLimiter 객체를 입력받은 TPS로 객체 생성을 하고, runnable로 주문 요청하는 파라미터를 넘기면 TPS 파라미터 수만큼 요청을 제한해서 할 수 있어요.

object RateLimiterCreator {

    private val DEFAULT_TIMEOUT_DURATION = Duration.ofSeconds(5) // resilience4j 기본 설정

    fun of(
        limitRefreshPeriod: Duration,
        limitForPeriod: Int,
        name: String,
        timeoutDuration: Duration = DEFAULT_TIMEOUT_DURATION,
    ): RateLimiter {
        val config = RateLimiterConfig.custom()
            .limitRefreshPeriod(limitRefreshPeriod)
            .limitForPeriod(limitForPeriod)
            .timeoutDuration(timeoutDuration)
            .build()

        val registry = RateLimiterRegistry.of(config)
        val rateLimiter = registry.rateLimiter(name)

        return rateLimiter
    }
}

위 코드는 RateLimiterConfig 를 생성하는 코드예요.

설정값들을 yml 파일로 관리할 수 있지만, 배치잡 파라미터로 TPS를 제어하고 배치를 실행할 때 TPS를 결정하고 싶어 코드로 관리하는 선택을 했어요. 아래 이미지에 보이듯이 간단하게 파라미터만 넘기면 배치에서 브로커로 요청하는 TPS를 결정할 수 있어요.

2. 빠른 이상 탐지 및 브로커 전환

미국 주식 장 운영 시간 특성상 브로커 이는 주로 새벽에 발생하고, 사람이 수동으로 대응을 하기에는 한계가 있었어요. 수동으로 브로커 이슈를 감지하기 위해서는 사람이 24시간 대기를 해야 하고, 담당자가 문제가 발생했는지아닌지 판단을 하는 순간에 피해 규모는 점점 더 커질 수 있어요.

그래서 메인 브로커 이슈가 감지되면 미리 정해둔 룰에 의해 시스템이 감지하고 서브 브로커로 주문을 보낼 수 있는 이상 감지 시스템을 만들게 되었어요.

위 그림은 설계 당시 요구사항이에요. 요구사항을 토대로 팀에서는 브로커 이상 감지되면 결과를 이벤트로 발행하고 이벤트를 구독한 시스템에서 자율적으로 대응할 수 있는 시스템을 만들었어요. 이상 감지 시스템도 역시 잘 만들어진 Grafana, kibana 등의 모니터링 도구를 사용하여 쉽게 구축을 할 수 있었어요.

각 모니터링 도구는 설정한 룰에 매칭이 되면 이상 감지 시스템 API를 호출해서 정보를 제공하는 방식으로 설계했어요. 이후에 이상 감지 시스템은 Kafka 이벤트를 발행하고 각 토픽을 구독하고 있는 시스템에 이슈 상황을 전파를 할 수 있어요. 결국에 하나의 토픽을 여러 시스템이 구독하고 있는 구조이다 보니 자연스럽게 규격화된 JSON 구조를 여러 시스템이 사용할 수 있어요.

이번 글에서는 Grafana로 어떻게 이상 감지 시스템을 구축 했는지 더 자세히 설명드릴게요. Grafana를 사용한 이유는 로그보다는 시스템 메트릭으로 조건 지정을 하고 싶었고, 이슈를 대응할 때 그래프 시각화 정보도 중요하다고 생각했기 때문이에요.

이제 Grafana 설정을 어떻게 했는지 간단히 살펴봐요. Grafana 버전에 따라 세팅 설정 상이할 수 있어요. 이번 글에서는 v10.4.1 기준으로 작성했어요.

Notification policy

먼저 웹훅 세팅을 위해 Notification policy를 먼저 살펴볼게요. 원하는 이름을 설정과 Contact point 설정이 필요해요. Contact point로 알림을 어떻게 보낼지도 설정할 수 있어요.

설정한 Contact point를 살펴보면, Intergration을 Webhook 으로 설정하고 URL 탭에 이상 감지 시스템 API 를 작성을 하면 돼요.

Alert rule

여기까지 Notification policy 생성하는 방법에 대하여 간단하게 알아봤어요. 이제 Alert rule 을 생성하고 생성한 rule 에 Notification policy 를 붙여주면 끝이에요.

Alert rule 설정하는 것을 살펴보면 쿼리는 1분 동안 API 요청이 실패한 개수를 카운트하도록 했어요. 그리고 alert 조건으로 위 쿼리로 수집된 에러 API 개수가 100 개가 넘으면 발생하도록 설정했어요.

이제 alert 조건 만족이 얼마나 지속이 될 때, 알림을 트리거할지 설정해주세요. Evaluation interval 과 pending period를 설정할 수 있어요. Evaluation interval는 위에서 작성한 쿼리 결과를 어느 주기로 체크할지 설정할 수 있고, pending period는 위에서 classic condition으로 작성한 조건이 만족하고 어느 기간 동안 만족하면 alert 을 발생시킬지 설정하는 값이에요.

예를 들어 evaluation interval를 30초 그리고 pending period를 3분으로 설정을 했다면, 30초 주기로 쿼리 결과를 체크하고 만약 classic condition을 만족했다면, 만족한 시점 이후로 3분 동안 지속이 된다면 alert 을 발생하게 돼요.

아래 Enum 코드는 alert이 발생했을 때, 이상 감지 시스템에서 제공하는 상태 타입이에요.

enum class SystemStatus {
    HEALTHY,    // 시스템이 정상
    CAUTION,    // 시스템에 문제가 있을 수도 있음. 알아서 판단
    CRITICAL    // 시스템 down
}

이상 감지 시스템에서는 모니터링 도구로부터 API 호출을 받고 전달받은 데이터를 기반으로 위 세 가지 타입 중 하나를 택하여 Kafka 이벤트를 발행하게 돼요.

@Component
class BrokerFailOverConsumer(
    private val brokerFailoverService: BrokerFailoverService,
    private val slackSender: SlackSender,
) {
    private val log = KotlinLogging.logger { }

    @KafkaListener(
        topics = ["topicName"],
        clientIdPrefix = "appName",
        containerFactory = "falioverContainerFactory"
    )
    fun consume(record: ConsumerRecord<String, String>) {
        val json = record.value()
        log.info { "system-status-event consume: $json" }

        val message = JsonUtil.fromJson(record.value(), SystemStatusRecord::class.java)
        
        when (message.status) {
            SystemStatus.CRITICAL -> {
                slackSender.sendCriticalSlackMessage(message)
                brokerFailoverService.failover()
            }
            SystemStatus.CAUTION -> {
                log.info { "브로커 상태 알림 수신: BrokerType=${message.type}, status=${message.status}" }
            }
            SystemStatus.HEALTHY -> {
                when (message.prevStatus) {
                    SystemStatus.CRITICAL -> slackSender.sendReturnHealthySlackMessage(message, message.prevStatus!!)
                    else -> log.info { "브로커 상태 알림 수신: BrokerType=${message.type}, status=${message.status}" }
                }
            }
        }
    }

위 코드는 이상 감지 시스템에서 발행한 Kafka 이벤트를 구독하고 CRITICAL 상황이면 브로커 failover 시키는 코드에요. 비즈니스 목적에 맞게 CRITICAL 이 아닌 경우에는 별도 대응을 하지 않고 로그 정도만 찍었습니다.

두 가지 케이스에 대하여 아래처럼 메신저 알림을 발송하고 있어요. 모든 임직원이 사용하는 메신저를 사용해서 담당자를 멘션해서 즉각 알림을 보낼 수 있어요.

  1. 브로커 상태가 CRITICAL 인 경우
  2. 브로커 상태가 HEALTHY 이지만 이전 상태가 CRITICAL 인 경우

3. 증권 원장에 MongoDB 도입

마지막으로 서비스가 성장하면서 데이터 사이즈도 같이 증가를 했어요. 따라서 데이터 관리를 어떻게 할지에 대한 고민거리도 같이 생겼어요. 단순한 조회 API가 500ms 이상 응답이 늦는 빈도가 증가하였고, 그중에 몇몇 API는 1000ms 이상 튀는 요청이 존재했어요.

테이블에 데이터가 크지 않았을 때는 문제가 되지 않았지만 데이터 사이즈가 증가하면서 비효율적인 쿼리들이 성능에 병목이 되었습니다.

따라서 여러 테이블들을 aggregate 한 테이블이 필요했어요. 데이터 저장소로 Oracle, MySQL 과 같은 RDB 혹은 MongoDB 사이에 많은 고민을 했어요. MongoDB를 후보군에 넣은 이유는 Oracle과 같은 RDB는 파티션 키가 없으면 모든 파티션을 검색하기 때문에 파티션 키가 필요하지만, MongoDB는 샤드 키가 없어도 어느 정도 준수한 성능 보장이 되기 때문이에요.

MongoDB는 샤드 키가 없이 쿼리를 날려도 모든 노드로 브로드캐스트 요청을 보내 해당 쿼리를 수행하고 mongos(라우터)에서 결과를 조합해서 변환해줘요. 즉, 모든 노드에서 병렬처리를 하기 때문에 적절한 인덱스만 구성이 되면 어느 정도 준수한 성능이 보장이 돼요.

파티션 키를 고민한 이유는 데이터가 증가하면서 Oracle DB에 있는 주요 테이블에 파티션 키를 잡을 필요성이 생겼기 때문이에요.

사내 Oracle DBA(DataBase Administrator)로부터 데이터가 많은 주요 원장 테이블에 파티션 전환을 해야한다는 이야기를 들었어요. 여기서 개발자와 DBA 사이에 파티션 키로 뭐로 잡을지에 대한 의견이 달랐어요.

개발자 입장에서는 증권의 모든 고객 요청은 계좌번호 조건을 찍고 조회하는 특성이 있기 때문에 계좌번호를 파티션 키로 잡고 싶은 니즈가 있었어요. 반면에 DBA 입장에서는 성능 향상도 중요하지만 디스크 저장도 같이 고려한다면 날짜를 파티션 키로 잡는 게 유리하기 때문에 날짜 단위로 파티션 키를 잡는 것을 권장했어요.

디스크 저장을 고려해야 하는 이유는 다른 DB에 비해 Oracle DB는 상대적으로 확장에 불리하기 때문에 디스크 관리도 같이 고민을 해주면 좋기 때문이에요. 날짜를 파티션 키로 잡게 되면 상대적으로 조회 빈도가 낮은 과거 데이터를 압축하기에 유리하기 때문에 압효율이 높아지고 디스크에 저장하는 데이터 사이즈를 많이 줄일 수 있어요.

결국 고민 끝에 팀에서는 MongoDB를 도입하기로 선택했어요.

원천 데이터를 저장하는 Oracle DB는 날짜로 파티션 키를 잡아 장비 운영 효율을 높이고, 비즈니스 목적으로 계좌 단위로 조회를 하는 데이터는 MongoDB로 조회를 함으로써 상황에 맞게 적절하게 모두 대응을 할 수 있다는 점이 팀 상황에 매우 필요하다고 생각했어요. 그리고 성능도 준수하게 보장이 되고, 샤드 확장도 상대적으로 자유로우니 운영 비용도 크지 않다고 생각했습니다.

이제 Oracle DB에서 MongoDB로 어떻게 데이터 마이그레이션했는지 살펴볼게요.

위 그림은 데이터 마이그레이션 플로우에요. 문제가 된 Oracle 쿼리는 성능이 좋지 않았기 때문에, 빠른 배치 처리를 위해 Hadoop 기반 병렬처리 쿼리 엔진인 Impala에서 원본 데이터 조회를 했어요. Impala는 이미 사내에서 데이터 웨어하우스로 사용 중이기 때문에 별도 환경 구축 없이 빠르게 사용할 수 있었습니다.

마이그레이션 이후 발생하는 데이터는 최대한 지연 없이 원본 데이터와 정합을 유지하기 위해 Kafka를 사용해서 실시간으로 데이터를 저장했어요. Kafka 이벤트 또한 이미 발행하고 있었기 때문에 MongoDB 데이터 저장하는 코드만 확장해서 빠르게 개발할 수 있었습니다.

마무리 🙇‍♂️

해외주식 서비스가 성장하는 과정 속에 발생한 문제들을 해결한 방법을 소개드렸는데요. 저 당시 팀에서는 언제 브로커 이슈가 발생할지 예상할 수 없기 때문에, 매우 빠르고 효율적으로 개발을 해야 했어요. 글에서 소개한 트래픽 제어, failover, 데이터 관리 등 간단하고 필수적인 기능들이지만 팀에서는 어떤 관점으로 문제를 인식했고 어떻게 지혜롭게 문제를 해결했는지 공유하고 싶었어요.

이번 글에서 최종적으로 전달하고 싶은 메시지는 먼저 문제의 핵심을 파악하고 이를 적정 수준의 엔지니어링을 통해 해결하는 것이 중요하다는 것이에요. 오버엔지니어링 없이 기존에 이미 잘 구현된 제품을 사용하여 문제를 빠르게 해결하고, 절약한 시간을 중요한 비즈니스 개발에 집중할 수 있었고, 이는 저희 팀이 빠르게 성장할 수 있는 비결이라고 생각해요.

댓글 0댓글 관련 문의: toss-tech@toss.im
연관 콘텐츠
㈜비바리퍼블리카 Copyright © Viva Republica, Inc. All Rights Reserved.