Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (3)

2025. 12. 22. 16:22·dev/ai

데이터 수집 과정에 사용될 하이퍼파라미터는 다음과 같이 설정했다.

INTERVAL_SEC = 30
LABEL_THRESHOLD = 0.008  # 0.8% 수익률 기준
SELECT_TOP_N = 5
SEQ_LEN_SEC = 300  # 예측에 쓸 과거 데이터 기간 : 5분 = 시퀀스 데이터 구성하는 인터벌 개수 : 10
MAX_SEQUENCES = 2000   

FEATURE_COLS = [
    'slope',
    'accel',
    'last_return',
    'cusum_pos',
    'cusum_neg',
    'volume_ratio',
    'bid_ask_imbalance',
    'spread_ratio'
]

 

feature 컬럼 중 앞의 6개는 거래 정보(tick data), 뒤의 2개는 주문 정보(order book)가 필요하다. 따라서 5개 종목을 리스트에 저장하고 각 종목별로 웹소켓으로 받은 거래 정보와 주문 정보를 별도의 deque에 저장하도록 구성했다. 이를 위해 trade와 orderbook을 한 번에 구독할 수 있는 payload를 웹소켓으로 전송했다.

class WSTickCollector:
    def __init__(self, markets, maxlen=5000):
        self.markets = list(markets)
        self.ticks = {m: deque(maxlen=maxlen) for m in self.markets}
        self.orderbooks = {m: deque(maxlen=maxlen) for m in self.markets}
        self.lock = threading.Lock()
        self.ws = None

    def on_message(self, ws, message):
        try:
            data = json.loads(message)
            market = data.get('code') or data.get('market')
            if not market or market not in self.markets:
                return

            # 거래 데이터
            if 'trade_price' in data:
                tick = {
                    'market': market,
                    'trade_price': float(data.get('trade_price', 0)),
                    'trade_volume': float(data.get('trade_volume', 0)),
                    'timestamp': data.get('timestamp'),
                    'ask_bid': data.get('ask_bid'),
                }
                with self.lock:
                    self.ticks[market].append(tick)

            # orderbook 데이터
            elif 'orderbook_units' in data:
                ts = data.get('timestamp')
                ob_batch = []
                for u in data['orderbook_units']:
                    ob_batch.append({
                        'market': market,
                        'timestamp': ts,
                        'bid_price': float(u['bid_price']),
                        'bid_size': float(u['bid_size']),
                        'ask_price': float(u['ask_price']),
                        'ask_size': float(u['ask_size']),
                    })
                with self.lock:
                    self.orderbooks[market].extend(ob_batch)

        except Exception as e:
            logging.debug("WS message parse error: %s", e)

    def on_open(self, ws):
        payload = [
            {"ticket": "ml_dataset_collector"},
            {"type": "trade",
             "codes": self.markets,
             "isOnlyRealtime": True
            },
            {"type": "orderbook",
             "codes": self.markets
            }
        ]
        ws.send(json.dumps(payload))
        logging.info("WebSocket subscription sent for %d symbols", len(self.markets))

    def start(self):
        def run_ws():
            import websocket
            self.ws = websocket.WebSocketApp(
                "wss://api.upbit.com/websocket/v1",
                on_message=self.on_message,
                on_open=self.on_open
            )
            self.ws.run_forever()

        t = threading.Thread(target=run_ws, daemon=True)
        t.start()
        import time
        time.sleep(1)

    # deque 비우기 전에 Lock 걸고 모든 메시지 추출
    def pop_all(self):
        with self.lock:
            out_ticks = {m: list(self.ticks[m]) for m in self.markets}
            out_orderbooks = {m: list(self.orderbooks[m]) for m in self.markets}
            for m in self.markets:
                self.ticks[m].clear()
                self.orderbooks[m].clear()
        return out_ticks, out_orderbooks

 

deque에 너무 많은 데이터가 들어와서 메모리 과다 사용하는 걸 방지하기 위해 max_len을 5000으로 설정했다. 추가로 deque에 추가하거나 pop할때 그 유명한 race condition을 막기 위해 threading.Lock()을 활용했다. 웹소켓으로 deque를 채우는 쓰레드와 pop으로 deque를 비우는 쓰레드가 동시에 실행되므로 race condition 정석 상황이다.

 

이에 추가로 한 번에 하나씩 들어오는 틱 데이터는 append로 처리해서 Lock 점유를 최소화하고 오더북은 extend로 처리해서 atomicity를 보장했다고 한다(gpt가). websocket을 받는 thread는 daemon = True로 데몬 스레드로 실행해서 무한 루프 작업이 안전하게 종료될 수 있게 짰다고 한다(gpt가). 운영체제에서 배운 개념이 이렇게 쓰이는걸 보니 반가웠다. threading.Lock()이 파이썬의 mutex 구현체인 것도 알게됐다.

 

흔하진 않지만 네트워크 지연 등으로 한 구간동안 여러 인터벌의 데이터가 생기거나 틱이 아예 없는 경우 어떤 인터벌 데이터도 안 생기는 상황도 분명 생길 수 있다. 전자는 가장 먼저 받은 인터벌에 해당하는 틱 데이터들만 집계하도록 했고 후자는 전통적인 증권사의 캔들 데이터 제공 api 관례를 따라 가격 정보에 대해선 직전 구간의 값을 유지하며 거래량 정보에 대해 0을 채우도록 처리했다.

 

틱과 오더북 데이터는 기본적으로 시간 기준 정렬한 상태로 제공한다는 보장이 없기 때문에 받은 데이터를 인터벌로 동기화해서 붙이는게 중요하다. 이 과정에서 거래량이 적은 종목들은 ohlcv가 채워져도 orderbook의 필드가 전부 채워지지 못해서 버려지는 인터벌도 충분히 존재할 수 있다. 감지하고자하는 단기 패턴은 필연적으로 거래 정보에 기인한 것이므로 모델에게 조금이라도 의미 있는 데이터를 학습시키기 위해 오더북 데이터가 완전히 채워진 인터벌만 feature 계산에 활용하기로 결정하였다. 

 

이렇게 인터벌동안 받은 데이터를 인자에 담아 aggregate_interval을 호출하며 본격적인 데이터 수집이 시작된다. 해당 메서드는 틱 데이터와 오더북을 집계하여 인터벌을 대표하는 DataFrame을 리턴하는 메서드인 aggregate_ticks, aggregate_orderbook를 포함하는 wrapper 함수이다. 이 일련의 과정은 종목 안에서 일어나는 것이므로 종목별로 반복하는 for 문 안에서 실행되며 종목별로 history를 운영하여 로직이 꼬이지 않도록 했다.

 

인터벌을 대표하는 집계값인 ohlcv, orderbook 데이터가 완성되면 feature 컬럼 계산이 가능해지는데 전에 언급했듯이 차분이 필요한 값이 있는 feature 특성상 집계값 세 개가 모여야 모든 컬럼이 제대로 계산된다. 따라서 제대로 계산된 세번째 인터벌 feature 벡터부터 시퀀스 데이터의 첫 인터벌을 구성하도록 짰고, 이를 확인할 수 있게 log_t0_feature를 통해 로그를 찍어 확인했다.

각 종목별 계산된 t0 feature의 로그
첫 feature 계산 완료 이후 인터벌부터는 수집한 틱 데이터 수와 feature 계산 완료된 인터벌 수 출력

 

logged_t0 배열로 마킹해서 종목별로 t0 feature 벡터가 완성된 시점에 처음 한번만 찍도록 했다. 30초 간격으로 수집한 데이터 중 틱 데이터 개수와 그를 이용하여 feature의 모든 컬럼 계산이 끝난 인터벌의 수를 출력하도록 했다. feature 벡터 11개(10개의 과거 인터벌 + 1개의 미래 인터벌)가 모여 하나의 시퀀스 데이터를 완성한 인터벌마다 새로 저장된 시퀀스 데이터 수와 지금까지 저장된 시퀀스 데이터셋의 shape를 출력하도록 했다.

 

No sequences yet에서 처음 Saved new sequences로 바뀌는 인터벌에서의 shape (3, 10 ,8) 은 (num_samples, SEQ_LEN, PER_STEP_FEATURE)로 (샘플 수, 시퀀스 길이(10 인터벌), 인터벌 당 feature 컬럼수(8))을 뜻한다. 

 

즉, 10개의 feature벡터가 모여서 하나의 시퀀스 데이터를 완성했고 각 feature를 이루는 값은 8개이며 이 시퀀스 데이터가 3개 만들어졌다는 뜻이다.

 

시퀀스 데이터가 생긴 첫 인터벌이므로 3개 종목에 대해 첫 시퀀스 데이터가 완성됐다고 볼 수 있을 것이다.

11개 인터벌 이후 인터벌마다 새로 완성된 시퀀스 데이터 수와 저장된 총 데이터 shape 출력

 

전에 로그로 확인한 t0 feature들이 시퀀스 데이터를 실제로 구성하는지  csv 파일을 확인해봤다. 1행, 2행, 3행, 5행, 16행의 시퀀스 데이터가 각각 위의 5종목의 t0를 구성하는 feature임을 확인할 수 있다. 중간에 여러 시퀀스 데이터가 끼여있는 건 뒤쪽 종목의 거래량이 적어 오더북 데이터가 채워지지 않아 t0 feature를 완성하지 못하는 동안 거래량이 많은 종목들이 한 인터벌씩 슬라이드하며 다음 인터벌의 feature를 먼저 완성했기 때문이다.

실제 시퀀스 데이터를 이루는 로그의 t0 feature


이렇게 10개의 인터벌 feature가 모이면(t0~t9) t10의 last_return 필드(수익률=직전 대비 가격 상승률) 값을 기준으로 라벨링을 진행한다. 가장 중요한 label 단계이다. 아무래도 차트보고 직접 분류하는 것이 정확하겠지만 애초에 급등과 급락이라할 만한 이벤트가 흔치 않을 뿐더러 30초마다 쉴새없이 들어오는 시퀀스 데이터를 일일이 확인하는건 불가능하므로 rule based로 자동 라벨링하기로 결정했다.

 

역시 label 기준이 되는 threshold 잡는 것이 관건이다. 10개의 인터벌 feature(30초*10=5분)를 하나의 시퀀스 데이터로 묶고 그 다음 인터벌(다음 30초)의 수익률(return)을 기준으로 분류하는 구조이므로 threshold가 너무 크면 의미있는 패턴을 학습하지 못하게 되고, 작으면 의미없는 패턴도 학습하게 된다. 일단 특정 종목의 차트를 보고 의미있는 변동성이다라고 하는 구간을 보고 감으로 0.8%라는 threshold를 임시로 정했다.

 

즉, 하나의 시퀀스 데이터는 과거 5분동안의 feature 입력을 받고 그 다음 30초동안 0.8% 이상의 상승이 있었다면 2, 0.8% 이상의 하락이 있었다면 0, 그 외는 1로 label을 결정했다.

 

def create_sequences_one_market(
    df_feat: pd.DataFrame,
    features: list[str],
    seq_len_sec: int,
    interval_sec: int,
    threshold: float
    ):

    seq_len = seq_len_sec // interval_sec
    label_len = 30 // interval_sec

    if len(df_feat) < seq_len + label_len:
        return None, None, None

    X, Y, seq_ids = [], [], []
    
    returns = df_feat['last_return'].values

    for i in range(len(df_feat) - seq_len - label_len + 1):
        seq_x = df_feat.iloc[i:i+seq_len][features].values
        X.append(seq_x)

        # 시퀀스 끝에서 다음 label_len 구간의 수익률로 라벨 계산
        future_return = np.prod(1 + returns[i+seq_len:i+seq_len+label_len]) - 1

        if future_return >= threshold: # threshold 이상으로 상승 = 매수판단 했어야 함
            Y.append(2)
        elif future_return <= -threshold: # threshold 이상으로 하락 = 매도판단 했어야 함
            Y.append(0)
        else:
            Y.append(1)                 # 그 외 : 관망

        # 시퀀스 시작 interval 기준으로 seq_ids 저장
        seq_ids.append(df_feat['interval'].iloc[i])  

    return np.array(X), np.array(Y), seq_ids

 

한 행으로 구성된 시퀀스 데이터의 마지막 값은 label 컬럼이다. 역시 대부분의 데이터는 1에 속하지만 간간이 2가 보인다. 차트와 대조해본 결과 나름의 상승세가 반영된 데이터임을 확인했다.

클래스 1 사이에 간간이 존재하는 클래스 2의 데이터

 

위의 WSTickCollector로 Upbit의 웹소켓 연결을 완료한 후 데이터 수집 과정 전체적인 흐름은 다음과 같다.

    #메인 루프
    saved_intervals = set()           # interval 단위 중복 방지
    features_by_market = {}
    market_history = {}
    logged_t0 = {}
    saved_sequence_ids = set()      # 시퀀스 단위 저장 체크 및 MAX_SEQUENCES 제한용


	try:
        while True: 
            time.sleep(INTERVAL_SEC)

            # 틱 수집
            ticks_by_market, orderbooks_by_market = collector.pop_all()
            tick_summary = {m: len(ticks_by_market.get(m, [])) for m in symbols}
            logging.info("Interval collected ticks: %s", tick_summary)

      		# 종목별 데이터 가공
            for market in symbols:
                ticks = ticks_by_market.get(market, [])
                orderbooks = orderbooks_by_market.get(market, [])

                # interval 단위 집계 (OHLCV + orderbook)
                agg = aggregate_interval(
                    ticks=ticks,
                    orderbooks=orderbooks,
                    interval_sec=INTERVAL_SEC
                )
                if agg is None or agg.empty:
                    continue

                agg['interval'] = pd.to_datetime(agg['interval'])

                # 히스토리 누적 (중복 interval 제거 필수)
                if market not in market_history:
                    market_history[market] = agg.copy()
                    logged_t0[market] = False
                else:
                    market_history[market] = (
                        pd.concat([market_history[market], agg], ignore_index=True)
                        .drop_duplicates(subset=['interval'])
                        .sort_values('interval')
                        .reset_index(drop=True)
                    )

                # feature 계산
                df_feat = compute_features_one_market(
                    agg=market_history[market],
                    interval_sec=INTERVAL_SEC,
                    verbose=False
                )

                # t0 feature 로그
                if not logged_t0[market] and len(df_feat) >= 1:
                    log_t0_feature(
                        agg=market_history[market],
                        df_feat=df_feat,
                        feature_cols=FEATURE_COLS,
                        t0_index=0
                    )
                    logged_t0[market] = True

                # features_by_market 누적
                if df_feat is not None and not df_feat.empty:
                    if market not in features_by_market:
                        features_by_market[market] = df_feat
                        added_count = len(df_feat)
                    else:
                        old_len = len(features_by_market[market])
                        features_by_market[market] = pd.concat(
                            [features_by_market[market], df_feat],
                            ignore_index=True
                        ).drop_duplicates(subset=['interval']).reset_index(drop=True)
                        added_count = len(features_by_market[market]) - old_len

                    logging.info(
                        "Market %s: +%d ML-ready intervals (total=%d)",
                        market,
                        added_count,
                        len(features_by_market[market])
                    )
            # 시퀀스 생성
            X_all, Y_all, seq_ids = create_sequences_all_markets(
                features_by_market=features_by_market,
                features=FEATURE_COLS,
                seq_len_sec=SEQ_LEN_SEC,
                interval_sec=INTERVAL_SEC,
                threshold=LABEL_THRESHOLD
            )

            if X_all is None:
                logging.info("No sequences yet")
                continue

            # dedup + CSV 저장
            X_new, Y_new = [], []

            for x, y, (market, start_interval) in zip(X_all, Y_all, seq_ids):
                start_interval = pd.to_datetime(start_interval)

                # 시퀀스 단위 dedup만 수행
                if (market, start_interval) in saved_sequence_ids:
                    continue

                saved_sequence_ids.add((market, start_interval))
                X_new.append(x)
                Y_new.append(y)

            # CSV 저장
            if X_new and len(saved_sequence_ids) < MAX_SEQUENCES:
                X_new = np.array(X_new)
                Y_new = np.array(Y_new)

                save_sequence_csv(
                    X_all=X_new,
                    Y_all=Y_new,
                    feature_dim=len(FEATURE_COLS),
                    seq_len=SEQ_LEN_SEC // INTERVAL_SEC,
                    save_path=SAVE_PATH
                )

                logging.info(
                    "Saved new sequences: %d (total=%d, shape=%s)",
                    len(X_new),
                    len(saved_sequence_ids),
                    X_new.shape
                )

            elif len(saved_sequence_ids) >= MAX_SEQUENCES:
                logging.info("Reached max sequences (%d). Stopping CSV save.", MAX_SEQUENCES)
                break

            # CSV 저장 후 features_by_market에서 interval 제거
            for market, df in features_by_market.items():
                mask = pd.Series(True, index=df.index)
                for _, start_interval in seq_ids:
                    if _ != market:
                        continue
                
                # 시퀀스 interval 범위 계산
                seq_start = start_interval
                seq_end = start_interval + pd.Timedelta(seconds=SEQ_LEN_SEC - INTERVAL_SEC)
                mask &= ~((df['interval'] >= seq_start) & (df['interval'] <= seq_end))
            features_by_market[market] = df[mask].reset_index(drop=True)


    except KeyboardInterrupt:
        logging.info("Stopped by user")

 

각 종목의 틱 데이터에서 캔들 추출, 캔들에서 feature 추출, 여러 feature를 합쳐 하나의 시퀀스 데이터를 만들고 모든 종목의 시퀀스 데이터를 하나의 csv파일로 합치는 일련의 구현을 완료했다. 파이썬의 agg, dataframe 등 기본 개념이 전혀 없이 배열같은 간단한 자료 구조만 다뤄본 나에게 이런 고차원 자료 구조는 문법도 사용법도 생소했다. 처음엔 일단 바이브 코딩으로 도전해서 받은 코드로 결과물만 찍어봤다. 엉터리인걸 확인하고 모든 과정에 로그를 찍고 검증하며 GPT 무료 버전과 3일 밤낮으로 사투를 벌였다. 하루에 한 번 잠깐 오는 똑똑한 gpt로 1보 전진하고 나면 그 뒤에 오는 멍청한 gpt가 2보 후퇴시켜서 결국 머리 싸매고 코드 흐름 따라가면서 체크했다. 

 

유료버전 구독해야겠다.

'dev > ai' 카테고리의 다른 글

Codex Skill 등록하기 - bcpprm 사례로 익히는 워크플로 자동화 가이드  (1) 2026.05.23
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (5)  (1) 2025.12.22
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (4)  (0) 2025.12.22
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (2)  (0) 2025.12.21
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (1)  (0) 2025.12.20
'dev/ai' 카테고리의 다른 글
  • Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (5)
  • Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (4)
  • Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (2)
  • Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (1)
cusum26
cusum26
  • cusum26
    CUSUMlog
    cusum26
  • 전체
    오늘
    어제
    • 분류 전체보기 (18)
      • dev (15)
        • blockchain (1)
        • ai (6)
        • web (0)
        • infra (4)
        • app (4)
      • cs (1)
        • blockchain (1)
      • scalability (2)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    bccprm
    비동기
    FanoutTask
    kafka ui
    __consumer_offsets
    Kafka
    fanout-on-read
    컨슈머 오프셋
    KafkaConfig
    codex skill
    kafkaListenerContainerFactory
    도메인 이벤트
    fanout-on-write
    lazy fanout
    group metadata
    acks
    Merkle Trie
    msa
    consumer offset
    min.insync.replicas
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
cusum26
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (3)
상단으로

티스토리툴바