저번에 모델 학습까지 완료했으니 이제 실시간으로 검증해볼까 한다. 현재 기준 제일 상승률 높은 종목 1개를 구독해서 웹소켓으로 실시간으로 데이터를 받아 30초 인터벌마다 feature에 해당하는 입력을 모델에 넣었고, 모델이 분류한 다음 인터벌의 label을 action과 매핑해서 실제와 비교하는 흐름으로 진행했다. 물론 현재 구간의 label은 아직 정해지지 않은 미래이므로 현재보다 30초 전을 기준으로 실시간 취급해서 모델 출력과 실제를 비교하며 평가했다.
매 30초마다 이전 5분의 입력을 토대로 buy, sell, hold중 하나를 예측하도록 했고 예측값을 모아서 Acc 와 macro F1을 계산해서 출력하도록 구성했다. 물론 처음 5분동안은 데이터를 모으는 warming up 구간으로 설정했다.


ANIME 종목을 구독하고 실시간으로 비교해봤다. 급등 같은 이벤트가 흔치 않으므로 횡보 구간에서 모델이 익숙한 hold만 나올걸 당연히 알면서도 일단을 돌려봤는데 역시 hold의 향연인걸 관찰할 수 있었다. 그렇게 몇 분 기다리다보니 순간적으로 급락하는 구간을 경험했다.


실제 차트로도 분봉 기준 18:12와 18:15에 0.8%에 가까운 종가 하락이 보였으므로 30초 인터벌씩 밀리는걸 감안하면 True는 꽤 정확하게 처리하고 있음이 드러난다. 그러나 터무니없이 적은 학습 데이터로 충분히 패턴을 학습하지 못한 모델이 sell로 바로 예측하지 못하는 건 놀라운 일은 아니다.


모델의 성능은 뒤로하고 어찌됐든 데이터를 실시간으로 처리해서 모델 성능을 모니터링하는 코드가 실제로 작동은 하는걸로 보인다.
다음은 gpt가 써준 실시간 평가용 코드이다.
# 메인 루프
if __name__ == "__main__":
market = select_top_symbol()
logging.info("Selected top symbol: %s", market)
device = torch.device("mps") if torch.backends.mps.is_available() else "cpu"
model = CNNLSTM(len(FEATURE_COLS))
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model.to(device).eval()
logging.info("CNN-LSTM model loaded on device: %s", device)
scaler = joblib.load(SCALER_PATH)
collector = WSTickCollector(market)
collector.start()
logging.info("WebSocket collector started for %s", market)
market_history = pd.DataFrame()
history = deque(maxlen=SEQ_LEN)
last_interval = None
y_true, y_pred = [], []
while True:
time.sleep(INTERVAL_SEC)
ticks, obs = collector.pop_all()
agg = aggregate_interval(ticks, obs)
if agg is None or len(agg) < 2:
continue
# market_history 누적
market_history = pd.concat([market_history, agg]).drop_duplicates('timestamp').reset_index(drop=True)
feat_df = compute_features(market_history)
# 새로 생긴 인터벌만 처리
new_rows = feat_df[feat_df['timestamp'] > (last_interval if last_interval else pd.Timestamp(0, tz='UTC'))]
if new_rows.empty:
continue
for idx, row in new_rows.iterrows():
fv = [row[c] for c in FEATURE_COLS]
fv_scaled = scaler.transform([fv])[0]
history.append(fv_scaled)
# 이전 인터벌 timestamp 업데이트
last_interval = row['timestamp']
# SEQ_LEN 이상이면 바로 예측
if len(history) < SEQ_LEN:
logging.info("[%s] Warming up (%d/%d)", row['timestamp'], len(history), SEQ_LEN)
continue
X = torch.from_numpy(np.array(history, dtype=np.float32)) # history (deque of ndarray) → single numpy array
X = X.unsqueeze(0).to(device) # 배치 차원 추가 + device로 이동
# 모델 예측
with torch.no_grad():
pred = model(X).argmax(1).item()
# 실제 라벨은 이전 인터벌 기준
prev_idx = idx - 1
if prev_idx < 0:
continue
prev_row = feat_df.iloc[prev_idx]
r = prev_row['last_return']
if r >= LABEL_THRESHOLD:
true = ACTION_MAP['buy']
elif r <= -LABEL_THRESHOLD:
true = ACTION_MAP['sell']
else:
true = ACTION_MAP['hold']
y_true.append(true)
y_pred.append(pred)
acc = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, average='macro')
logging.info(
"[%s] Pred=%s True=%s | Acc=%.3f F1=%.3f",
row['timestamp'], INV_ACTION_MAP[pred], INV_ACTION_MAP[true], acc, f1
)
실시간 검증이라지만 실제 label과 얼마나 맞는지 자동으로 검증하기 위한 코드라 결국 실시간보다 한 인터벌(30초)씩 밀려서 추론하도록 했다. 즉 5분동안 워밍업으로 시퀀스데이터를 입력 받고 다음 30초 구간의 가격에 따른 action을 예측하는데 예측 대상의 인터벌이 현재 기준으로 미리 label 되어있어야하므로 모델은 실시간 추론이 아닌 현재 기준 30초 전의 구간을 추론하고 있는 것이다. 사실상 테스트 데이터셋을 실시간으로 넣어 성능을 측정하는 과정이다.
테스트까지 만족스러운 성능을 보인다면 진정한 실시간 추론에 적용할 수 있다. 실제 실시간 거래에 활용할 때는 평가가 아닌 추론으로 현재 수집한 인터벌을 그대로 입력으로 사용해서 다음 구간을 예측하도록 재구성해야할 것이다.
# 메인 루프
if __name__ == "__main__":
market = select_top_symbol()
logging.info("Selected top symbol: %s", market)
device = torch.device("mps") if torch.backends.mps.is_available() else "cpu"
model = CNNLSTM(len(FEATURE_COLS))
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model.to(device).eval()
logging.info("CNN-LSTM model loaded on device: %s", device)
scaler = joblib.load(SCALER_PATH)
collector = WSTickCollector(market)
collector.start()
logging.info("WebSocket collector started for %s", market)
market_history = pd.DataFrame()
history = deque(maxlen=SEQ_LEN)
last_interval = None
while True:
time.sleep(INTERVAL_SEC)
ticks, obs = collector.pop_all()
agg = aggregate_interval(ticks, obs)
if agg is None or len(agg) < 2:
continue
# market_history 누적
market_history = pd.concat([market_history, agg]).drop_duplicates('timestamp').reset_index(drop=True)
feat_df = compute_features(market_history)
# 새로 생긴 인터벌만 처리
new_rows = feat_df[feat_df['timestamp'] > (last_interval if last_interval else pd.Timestamp(0, tz='UTC'))]
if new_rows.empty:
continue
for _, row in new_rows.iterrows():
fv = [row[c] for c in FEATURE_COLS]
fv_scaled = scaler.transform([fv])[0]
history.append(fv_scaled)
last_interval = row['timestamp']
if len(history) < SEQ_LEN:
logging.info("[%s] Warming up (%d/%d)", row['timestamp'], len(history), SEQ_LEN)
continue
# 모델 입력 변환 및 실시간 예측
X = torch.from_numpy(np.array(history, dtype=np.float32)).unsqueeze(0).to(device)
with torch.no_grad():
pred = model(X).argmax(1).item()
action = INV_ACTION_MAP[pred]
logging.info("[%s] Predicted Action: %s", row['timestamp'], action)


22:00부터 쭉 횡보구간이라 hold가 맞지만 모델이 실시간 급등/급락 상황에서 어느정도 따라갈 수 있을지가 핵심이라 큰 의미는 없어보인다. 제대로 된 데이터로 학습시킨 후 실시간 검증에서 합리적인 성능을 보인다면 실시간 추론과 연계해 실제 계좌의 매수, 매도 api와 연결해서 자동 투자도 가능해보인다.
일단 프로그램은 만들어 놓았으니 하루빨리 코인시장이 활력을 되찾아서 양질의 데이터를 수집하고 얼마나 성능이 나올지 돌려보고싶다.
'dev > ai' 카테고리의 다른 글
| Codex Skill 등록하기 - bcpprm 사례로 익히는 워크플로 자동화 가이드 (1) | 2026.05.23 |
|---|---|
| Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (4) (0) | 2025.12.22 |
| Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (3) (0) | 2025.12.22 |
| Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (2) (0) | 2025.12.21 |
| Upbit 웹소켓을 이용한 급등 코인의 움직임 예측 모델 개발 (1) (0) | 2025.12.20 |