Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (1)

2025. 12. 20. 15:59·dev/ai

평소 토스증권의 급상승 탭에서 소액으로 해외 주식 단타를 하던 차에 급등, 급락 이벤트 직전 몇 초 동안 abnormal한 전조 패턴이(주포가 리드하고 개인들이 달라붙으며 거래량이 몰림에 따라 나타나는) 공통적으로 존재함을 경험적으로 감지했다.

 

이와 같은 패턴을 감지하는 분류 모델을 만들어서 실제 매수, 매도에 활용해보고자 퀀트 투자에서 쓰일 법한 시장 데이터 제공 api를 알아보기 시작했다.

 

잘만 찾는다면 샘플 데이터도 무제한으로 모으고 사전학습된 모델에 실시간으로 입력시켜서 매수, 매도 결정에 활용까지 가능하겠다 싶었지만 대부분의 api들이 유료거나 무료여도 호출 횟수 제한이 있는걸 확인했다. 

 

또한 실제 감지하고자 하는 패턴은 경험적으로 초단기 패턴이라 구하기 쉬운 1시간봉, 1일봉은 너무 길고 틱 단위 데이터를 수집해서 직접 초단위 캔들을 만드는걸로 방향을 잡았다.

+ 국내주식에선 키움증권 api만이 1분봉까지 제공하는데 이마저도 길고, 국내주식의 급등주는 규모가 작아 노이즈(개인이 달라붙는 정도까지는 못가는 상황)가 많을거라 판단

 

결국 어느정도 규모가 있으며 개인 참여도가 높고 틱 단위 데이터 제공이 열려있는 개방적인 시장 -> 코인 시장

Upbit에서 시장 데이터를 api와 웹소켓으로 횟수 제한없이 무료로 제공한다는 것을 확인하고 다음과 같은 흐름으로 구상하였다.

 

Upbit WebSocket을 활용해 실시간 암호화폐 시세 데이터를 수집하고,
시장 미시구조 기반 피처를 계산하여 급등 코인의 단기 움직임을 예측하는
이벤트 기반 머신러닝 모델

 

일단 데이터 수집을 위해 타겟으로 삼을 종목을 결정해야했는데 모델 목적상 단기 급등/급락 패턴이 잘 드러나는 가격 변동성이 큰 종목이어야했다. 일단은 원화 마켓의 코인만 취급하도록 했다. (다른 마켓도 넣고싶으면 "KRW-", "BTC-", "USDT-")

# Filename: upbit_data_collector.py

import requests
import websocket
import json
import threading
import time


# 설정
CHECK_INTERVAL = 30      # 시장 체크 간격 (초)
TOP_N = 5               # 상위 N개 급증 코인 선택
SPAWN_WS = {}            # WebSocket 스레드 저장
tick_data = {}          # 틱 데이터 저장용

# REST API로 시장 데이터 조회
def get_markets():
    url = "https://api.upbit.com/v1/market/all"
    try:
        resp = requests.get(url, timeout=10)
        resp.raise_for_status()
        markets = [m['market'] for m in resp.json() if m['market'].startswith("KRW-")]
        return markets
    except Exception as e:
        print(f"get_markets error: {e}")
        return []

def get_tickers(markets):
    if not markets:
        return []
    url = "https://api.upbit.com/v1/ticker"
    params = {"markets": ",".join(markets)}
    try:
        resp = requests.get(url, params=params, timeout=10)
        resp.raise_for_status()
        return resp.json()
    except Exception as e:
        print(f"get_tickers error: {e}")
        return []

# WebSocket 구독
def ws_on_message(ws, message):
    data = json.loads(message)
    symbol = data["code"]
    tick_data[symbol] = {
        "price": data["trade_price"],
        "volume": data["trade_volume"],
        "side": data["ask_bid"],
        "timestamp": data["timestamp"]
    }
    print(f"{symbol} | price: {data['trade_price']} | volume: {data['trade_volume']} | side: {data['ask_bid']}")

def start_ws(symbols):
    payload = [
        {"ticket": "tick_service"},
        {"type": "trade", "codes": symbols, "isOnlyRealtime": True}
    ]
    ws = websocket.WebSocketApp(
        "wss://api.upbit.com/websocket/v1",
        on_message=ws_on_message
    )
    threading.Thread(target=lambda: ws.run_forever(), daemon=True).start()
    time.sleep(1)  # WS 연결 안정화
    ws.send(json.dumps(payload))
    return ws

# 가격 급상승 N개 종목 가져오기
prev_prices = {}  # 직전 종가 저장용

def detect_price_spike():
    global SPAWN_WS, prev_prices
    markets = get_markets()
    tickers = get_tickers(markets)

    spike_symbols = []

    for t in tickers:
        symbol = t["market"]
        price = t["trade_price"]
        prev_price = prev_prices.get(symbol, t["prev_closing_price"])  # 처음엔 직전 종가 사용
        prev_prices[symbol] = price  # 현재 가격 저장

        # 가격 상승률 계산
        price_change = (price - prev_price) / prev_price

        spike_symbols.append((symbol, price_change))

    # 상위 N개 상승 종목 선택
    spike_symbols = sorted(spike_symbols, key=lambda x: x[1], reverse=True)[:TOP_N]
    top_symbols = [s[0] for s in spike_symbols]

    # 신규 종목만 WebSocket 구독
    new_symbols = [s for s in top_symbols if s not in SPAWN_WS]
    if new_symbols:
        print("Price spike detected:", new_symbols)
        SPAWN_WS.update({s: True for s in new_symbols})
        start_ws(new_symbols)

# 메인 루프
if __name__ == "__main__":
    print(f"Starting Upbit price spike collector. interval={CHECK_INTERVAL}s top_n={TOP_N}")
    try:
        while True:
            detect_price_spike()
            time.sleep(CHECK_INTERVAL)
    except KeyboardInterrupt:
        print("Stopped by user")

 

 

전체 원화 마켓에서 직전 종가(30초마다 갱신) 대비 상승률 가장 높은 5개 종목을 가져와서 틱 데이터를 찍어봤다.

+ 처음 체크할 때는 전일 종가 기준으로 상승률 계산

* 원화 마켓이므로 price, volume단위는 ₩ : 원화

CTC, ORDER, SXP가 엎치락뒤치락하며 실시간으로 순위가 계속 바뀌지만 직접 상승률을 계산하여 추출한 종목과 실제 upbit에서 보이는 종목이 동일해 보인다.

'dev > ai' 카테고리의 다른 글

Codex Skill 등록하기 - bcpprm 사례로 익히는 워크플로 자동화 가이드  (1) 2026.05.23
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (5)  (1) 2025.12.22
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (4)  (0) 2025.12.22
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (3)  (0) 2025.12.22
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (2)  (0) 2025.12.21
'dev/ai' 카테고리의 다른 글
  • Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (5)
  • Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (4)
  • Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (3)
  • Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (2)
cusum26
cusum26
  • cusum26
    CUSUMlog
    cusum26
  • 전체
    오늘
    어제
    • 분류 전체보기 (18)
      • dev (15)
        • blockchain (1)
        • ai (6)
        • web (0)
        • infra (4)
        • app (4)
      • cs (1)
        • blockchain (1)
      • scalability (2)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    FanoutTask
    lazy fanout
    bccprm
    kafka ui
    kafkaListenerContainerFactory
    도메인 이벤트
    비동기
    컨슈머 오프셋
    KafkaConfig
    fanout-on-read
    fanout-on-write
    Merkle Trie
    codex skill
    min.insync.replicas
    group metadata
    __consumer_offsets
    acks
    msa
    Kafka
    consumer offset
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
cusum26
Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (1)
상단으로

티스토리툴바