2024년 4월 21일 일요일

랭체인과 임베딩 벡터 데이터베이스 아키텍처 및 알고리즘 분석

이 글은 랭체인과 임베딩 벡터 데이터베이스를 분석한 것이다. 이를 통해, LLM 서비스 개발에 필요한 라이브러리 및 데이터베이스 구조를 이해하고, 다양한 분야에 응용할 수 있다. 
이 글은 다음 라이브러리 및 플랫폼 아키텍처 및 알고리즘을 분석한다. 
  • 랭체인
  • 벡터 데이터베이스
  • OLLAMA
  • LLAMA2.c

이 글에 관련된 용어와 상세 개념은 다음 링크를 참고한다. 

머리말


본론


마무리


레퍼런스

2024년 4월 11일 목요일

도커 이미지 빌드 시 문제 해결 솔류션 정리

이 글은 도커 이미지 빌드 시 문제 해결 솔류션을 정리한 글이다. 


도커 컨테이너 이미지는 개발, 운영 환경을 독립적으로 실행할 수 있는 가상화를 지원한다. 다만, 도커 이미지 빌드 시 설치되는 수많은 패키지에 의존되므로, 여러 에러가 발생된다. 대표적인 에러는 다음과 같다. 
  1. 'DEBIAN_FRONTEND=noninteractive' not working inside shell script with apt-get
  2. How can I set the default timezone
  3. Docker build failed with Hash Sum mismatch error
  4. Docker build failed with error "Hash Sum mismatch"
  5. throws an error saying "Some index files failed to download. They have been ignored or old ones used instead."
  6. Error while loading conda entry point: conda-libmamba-solver (libarchive.so.19: cannot open shared object file: No such file or directory)
  7. OpenGL, 3D graphics problems in docker
  8. Windows Volume mount
이외에도 매우 다양한데, 사실, 설치되는 패키지들이 많은 도커 이미지는 그만큼 원인이 많다. 스택 오버플로우 관련 댓글을 보면 알겠지만, 누구는 이런 문제로 몇 일이 날라갔다 등의 성토글을 볼 수 있다. 그만큼 원인이 다양하게 조합될 수 있다.

앞의 에러 솔류션만 정리해 본다. 

1번은 도커 빌드 시 [yes/no] 프롬프트 입력 상황에서 발생한다. 도커 빌드 중에는 키보드 입력이 안된다. 그러므로, 다음과 같이 인터렉티브가 없다고 설정하면 해결된다.
ARG DEBIAN_FRONTEND=noninteractive

2번은 타임존 문제로 이는 다음과 같이 설정한다. 타임존이 제대로 설정되어 있지 않으면, 패키지 설치에 실패할 수 있다. 
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
ENV TZ=Asia/Seoul

3, 4번 해쉬섬 에러 문제는 보통 우분투나 리눅스의 apt-get update 시 발생하는 데, stack overflow에 검색해 보아도 관련 솔류션을 시도해도 잘 처리 안되는 경우가 발생한다. 

이 경우는 다음과 같은 솔류션을 apt-get update 전에 dockerfile에 정의하라고 되어 있는 답변이 많다. 

RUN rm -rf /var/lib/apt/lists/*

RUN echo "Acquire::http::Pipeline-Depth 0;" > /etc/apt/apt.conf.d/99custom && \
    echo "Acquire::http::No-Cache true;" >> /etc/apt/apt.conf.d/99custom && \
    echo "Acquire::BrokenProxy    true;" >> /etc/apt/apt.conf.d/99custom

RUN echo "Acquire::Check-Valid-Until \"false\";\nAcquire::Check-Date \"false\";" | cat > /etc/apt/apt.conf.d/10no--check-valid-until

이렇게 해도 해결안되면, 기본이 되는 이미지를 우분투 과거 버전(예. 20.04), 혹은 미리 패키지 설치된 버전으로 설정하고 다시 시도한다. 
FROM ubuntu:20.04
FROM continuumio/miniconda3

이 문제는 무언가 해당 패키지를 배포하는 서버에서 문제가 있거나, 도커 빌드 컴퓨터의 시간이 안맞는 등의 문제로 예상된다. 

5번은 도커 이미지 빌드 시 동작되는 방식에 원인이 있다. 도커는 빌드 과정을 재활용하기 위해 레이어, 캐쉬 등을 사용하는 데, 가끔 이 부분이 꼬이는 경우가 있다. 이 경우, 다음과 같이 빌드해 본다 .
docker build --no-cache -t <docker_name> .

안되면, 현재 도커 이미지들을 모두 삭제하거나, 다음과 같이 clean, purge를 한다.
docker image rm -f
docker system prune -a -f

6번은 미리 설치된 아나콘다 이미지를 사용하라고 권장한다. 다음은 그 예이다.
FROM continuumio/miniconda3

7번은 호스트 서버가 3D GPU가 지원되지 않으면 아직 뚜렷한 해결방법은 없다. 도커 이미지는 본질적으로 3D GUI용으로 사용되도록 개발된 것이 아니다. 3D GUI로 이미지를 생성, 저장하는 등의 기능은 기본적으로 도커에서는 처리되지 않는다. 
이를 우회적으로 지원하는 몇 가지 방법이 있다. VirtualGL(ref), Nvidia GPU docker 등이 그러하다. 하지만, 도커가 실행되는 호스트 서버에 설치된 그래픽카드에 따라 이런 옵션이 동작되지 않을 수도 있다. 

8번은 윈도우에서 호스트 폴더와 도커 내 폴더를 마운트할 때 발생하는 문제이다. 황당하게도, 우분투에는 문제 없는 볼륨 마운트가 윈도우에는 별도 설정을 해줘야 한다. 다음과 같이 도커 설정 메뉴에서 Resource 메뉴의 File sharing 경로를 추가해야 -v 마운트 옵션이 동작된다. 

아울러, 윈도우에서는 경로 설정이 우분투와 다르므로, 아래와 같이 절대 경로 지정해야 한다.
docker run -v c:/input_data:/app/input -it <docker_image_name> python app_program.py --input ./input/data.json 


레퍼런스

2024년 4월 8일 월요일

시계열 데이터 예측을 위한 트랜스포머 모델 개발하기

이 글은 시계열 트랜스포머 모델 개념 및 활용 방법을 간략히 정리한다.

트랜스포머 개념도

머리말
트랜스포머의 원리를 이해하고 있다면, 수치로 표현된 시계열 벡터도 라벨링된 시계열 벡터가 있다면, 이를 학습할 수 있다는 것을 이해할 것이다. 

이 글은 트랜스포머 모델을 이용해 시계열을 학습하는 방법을 실습한다. 트랜스포머의 세부적인 기술은 다음 링크 및 레퍼런스를 참고한다. 
개발 환경
이 글을 실습하기 위해서는 기본적으로 다음 개발환경이 준비되어야 한다. 
  • NVIDIA GPU driver
  • Ubuntu or Windows CUDA driver 
  • PyTorch
  • Pandas, matplotlib
본론
시계열 데이터에서 패턴을 학습하고, 다음 값을 예측하는 방법은 여러가지가 있다. 트랜스포머의 경우에도, 트랜스포머 분류 기법을 이용하는 방법, Temporal Fusion Transformer를 사용하는 방법 등이 있다.

여기에서는 기본 개념을 먼저 이해하기 위해, 트랜스포머 분류 기법을 이용해 학습한다. 학습될 데이터는 다음과 같다. 

이 데이터는 페이스북 주식 종가, 개장가 등을 다운로드받은 FB_raw.csv 엑셀파일을 그래프화한 것이다. 데이터셋 크기는 160,681레코드이다. 실제 데이터 구조는 다음과 같다. 

데이터를 미리 다운로드한다.
알고리즘 순서는 다음과 같다.
  1. 주가 시계열 데이터 로딩
  2. 학습 및 테스트 데이터 생성
  3. 벡터 임베딩 함수 정의
  4. 트랜스포머 모델 정의: 포지션 인코딩, 인코더, 주가 예측을 위한 linear full connection 정의. 타겟 마스크 정의
  5. 데이터 학습
  6. 테스트 데이터 예측 결과 확인 
코딩을 위해, 파이썬 파일을 생선한다. 그리고, 다음과 같이 패키지를 임포트한다. 
import torch, torch.nn as nn
import numpy as np
import pandas as pd
import os, time, math
import matplotlib.pyplot as plt
from tqdm import tqdm
os.environ['KMP_DUPLICATE_LIB_OK']='True' # Intel Math Kernel Library use

학습될 데이터 형식을 설정한다. 학습 데이터는 10개, 예측 데이터는 1개이다. 
input_window = 10 # number of input steps
output_window = 1 # number of prediction steps, in this model its fixed to one
batch_size = 250
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 

페이스북 데이터를 읽고, 그래프형식으로 보여준다. 
# get this python's directory
module_path = os.path.dirname(__file__)
df = pd.read_csv(module_path + '/FB_raw.csv') # data path of facebook stock price (Apr 2019 - Nov 2020)
close = np.array(df['close'])
logreturn = np.diff(np.log(close)) # Transform closing price to log returns, instead of using min-max scaler
csum_logreturn = logreturn.cumsum() # Cumulative sum of log returns

fig, axs = plt.subplots(2, 1)
axs[0].plot(close, color='red')
axs[0].set_title('Closing Price')
axs[0].set_ylabel('Close Price')
axs[0].set_xlabel('Time Steps')

axs[1].plot(csum_logreturn, color='green')
axs[1].set_title('Cumulative Sum of Log Returns')
axs[1].set_xlabel('Time Steps')

fig.tight_layout()
plt.show()

벡터 임베딩 계산을 위해, 트랜스포머의 포지션 인코딩을 정의한다.  
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()       
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]        

트랜스포머 기반 시계열 학습 모델을 정의한다. 트랜스포머 정의 그대로 정의되며, decoder부분만 다르다. 이 부분은 트랜스포머 결과를 받아, 예측값 1개를 생성하는 full connection 레이어다. 
class transformer_seq(nn.Module):
    def __init__(self, feature_size=250, num_layers=1, dropout=0.1):
        super(transformer_seq, self).__init__()
        self.model_type = 'Transformer'
        
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(feature_size)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout)   
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)        
        self.decoder = nn.Linear(feature_size,1)
        self.init_weights()  # decoder FC층 가중치 초기화

    def init_weights(self):
        initrange = 0.1    
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self,src):
        if self.src_mask is None or self.src_mask.size(0) != len(src):  # 목표 마스크 생성
            device = src.device
            mask = self.generate_square_subsequent_mask(len(src)).to(device)
            self.src_mask = mask

        src = self.pos_encoder(src)
        output = self.transformer_encoder(src,self.src_mask)
        output = self.decoder(output)
        return output

    def generate_square_subsequent_mask(self, sz):  # 목표 마스크 생성 함수
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask        

학습될 데이터를 생성한다. 주가 데이터에서 input은 10개 수치, output은 1개 수치이다.
def create_inout_sequences(input_data, tw):
    inout_seq = []
    L = len(input_data)
    for i in range(L-tw):
        train_seq = input_data[i:i+tw]
        train_label = input_data[i+output_window:i+tw+output_window]
        inout_seq.append((train_seq ,train_label))
    return torch.FloatTensor(inout_seq)

학습될 데이터를 train, test dataset으로 분할하는 함수를 정의한다. 
def get_data(data, split):
    series = data
    
    split = round(split*len(series))
    train_data = series[:split]
    test_data = series[split:]

    train_data = train_data.cumsum()
    train_data = 2*train_data # 학습 데이터 값을 2배 증폭함으로써 학습 정확도를 높인다.
    test_data = test_data.cumsum()

    train_sequence = create_inout_sequences(train_data,input_window)
    train_sequence = train_sequence[:-output_window]

    test_data = create_inout_sequences(test_data,input_window)
    test_data = test_data[:-output_window]

    return train_sequence.to(device), test_data.to(device)

학습될 배치 데이터를 리턴하는 함수를 정의한다.
def get_batch(source, i, batch_size):
    seq_len = min(batch_size, len(source) - 1 - i)
    data = source[i:i+seq_len]    
    input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window, 1))
    target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window, 1))
    return input, target

학습 함수를 정의한다.
def train(train_data):
    model.train() # Turn on the evaluation mode
    total_loss = 0.
    start_time = time.time()

    for batch, i in enumerate(range(0, len(train_data) - 1, batch_size)):  # 배치크기만큼 루프
        data, targets = get_batch(train_data, i,batch_size)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.7)
        optimizer.step()

        total_loss += loss.item()
        log_interval = int(len(train_data) / batch_size / 5)
        if batch % log_interval == 0 and batch > 0:  # 배치 루프 시 loss, 정확도 출력
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.10f} | {:5.2f} ms | loss {:5.7f}'.format(
                    epoch, batch, len(train_data) // batch_size, scheduler.get_lr()[0], elapsed * 1000 / log_interval,
                    cur_loss))
            total_loss = 0
            start_time = time.time()

평가 함수를 정의한다.
def evaluate(eval_model, data_source):
    eval_model.eval() # Turn on the evaluation mode
    total_loss = 0.
    eval_batch_size = 1000
    with torch.no_grad():
        for i in range(0, len(data_source) - 1, eval_batch_size):
            data, targets = get_batch(data_source, i, eval_batch_size)
            output = eval_model(data)            
            total_loss += len(data[0])* criterion(output, targets).cpu().item()
    return total_loss / len(data_source)

학습 모델 기반 데이터 예측 함수를 정의한다. 
def model_forecast(model, seqence):
    model.eval() 
    total_loss = 0.
    test_result = torch.Tensor(0)    
    truth = torch.Tensor(0)

    seq = np.pad(seqence, (0, 3), mode='constant', constant_values=(0, 0))
    seq = create_inout_sequences(seq, input_window)
    seq = seq[:-output_window].to(device)

    seq, _ = get_batch(seq, 0, 1)
    with torch.no_grad():
        for i in range(0, output_window):            
            output = model(seq[-output_window:])                        
            seq = torch.cat((seq, output[-1:]))

    seq = seq.cpu().view(-1).numpy()
    return seq

실 데이터를 이용한 데이터 예측 함수를 정의한다.
def forecast_seq(model, sequences):
    """Sequences data has to been windowed and passed through device"""
    start_timer = time.time()
    model.eval() 
    forecast_seq = torch.Tensor(0)    
    actual = torch.Tensor(0)
    with torch.no_grad():
        for i in tqdm(range(0, len(sequences) - 1)):
            data, target = get_batch(sequences, i, 1)
            output = model(data)            
            forecast_seq = torch.cat((forecast_seq, output[-1].view(-1).cpu()), 0)
            actual = torch.cat((actual, target[-1].view(-1).cpu()), 0)
    timed = time.time()-start_timer
    print(f"{timed} sec")

    return forecast_seq, actual

학습 데이터를 준비한다.
train_data, val_data = get_data(logreturn, 0.6) # 60% train, 40% test split

모델, Loss함수, 하이퍼파라메터, 스케쥴 등을 정의한다.
model = transformer_seq().to(device)
criterion = nn.MSELoss() # Loss function
lr = 0.00005 # learning rate
epochs =  500 # Number of epochs

optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(train_data)
    
    if(epoch % epochs == 0): # 에폭마다 모델 평가
        val_loss = evaluate(model, val_data)
        print('-' * 80)
        print('| end of epoch {:3d} | time: {:5.2f}s | valid loss: {:5.7f}'.format(epoch, (time.time() - epoch_start_time), val_loss))
        print('-' * 80)
    else:   
        print('-' * 80)
        print('| end of epoch {:3d} | time: {:5.2f}s'.format(epoch, (time.time() - epoch_start_time)))
        print('-' * 80)

    scheduler.step() 

학습 후 모델 이용해 데이터 예측하고, 실제 데이터와 비교한다.
test_result, truth = forecast_seq(model, val_data)

plt.plot(truth, color='red', alpha=0.7)
plt.plot(test_result, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

테스트를 위해 랜덤값을 이용해 비교해 본다. 
r = np.random.randint(100000, 160000)
test_forecast = model_forecast(model, csum_logreturn[r: r+10]) # random 10 sequence length

print(f"forecast sequence: {test_forecast}")
print(f"Actual sequence: {csum_logreturn[r: r+11]}")
torch.save(model.state_dict(), "transformer_ts_20231211.pth")

학습 파일을 로딩한 후, 다른 시계열 데이터셋도 테스트해본다. 
model2 = TransAm() # rename as model2
model2.load_state_dict(torch.load("transformer_ts_20231211.pth"))
model2.to(device)

df2 = pd.read_csv(module_path + '/BA_raw.csv') # 보잉 주식 테스트
close2 = df2['close'].fillna(method = 'ffill')
close2 = np.array(close2)
logreturn2 = np.diff(np.log(close2))

train_data2, val_data2 = get_data(logreturn2, 0.6)
test2_eval = evaluate(model2, val_data2)
print(f'Test 2 loss: {test2_eval :.5f}')

test_result2, truth2 = forecast_seq(model2, val_data2)

plt.plot(truth2, color='red', alpha=0.7)
plt.plot(test_result2, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

df3 = pd.read_csv(module_path + '/JPM_raw.csv') # JPMorgan Chase & Co 주식 테스트
close3 = df3['close'].fillna(method = 'ffill')
close3 = np.array(close3)
logreturn3 = np.diff(np.log(close3))

train_data3, val_data3 = get_data(logreturn3, 0.6)
test3_eval = evaluate(model2, val_data3)
print(f'Test 3 loss: {test3_eval :.5f}')

test_result3, truth3 = forecast_seq(model2, val_data3)

plt.plot(truth3, color='red', alpha=0.7)
plt.plot(test_result3, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

결과는 다음과 같이, 잘 학습되어, 데이터셋을 예측하고 있는 것을 알 수 있다. 
학습 과정 화면
테스트 데이터셋 예측 결과(페이스북 주가)

전혀 다른 주식 종목도 예측해 보자. 다음은 보잉 주가이다. 
보잉 주가 예측
다음은 제이슨 모건 주식이다. 
제이슨 모건 주가 예측

결과적으로 학습 모델이 패턴을 잘 예측한다. 

레퍼런스

2024년 4월 1일 월요일

FastAPI, Uvicorn, Websocket 기반 Open API 서버 간단히 개발해 보기

이 글은 FastAPI, Uvicorn, Websocket 기반 Open API 서버 간단히 개발하는 방법을 정리한다. FastAPI를 이용하면, 매우 쉽게 Open API 서버를 개발할 수 있다. 
FastAPI 사용 사례 아키텍처(Prof Ai | Devpost)

FastAPI는 비동기 API 서버를 지원하며, uvicorn과 같은 ASGI 서버를 사용하여 실행할 수 있다. 이를 통해 빠른 성능과 비동기 처리를 구현할 수 있다. 자동 대화형 API 문서도 제공되어 개발자가 API를 쉽게 이해하고 사용할 수 있다.

FastAPI는 다음 웹 어플리케이션 프레임웍인 Flask, Django와 함께 활용하면 좋다.
패키지 설치는 다음과 같다. 
pip install fastapi aiohttp uvicorn

서버 개발
크게 2개 유형의 API를 개발한다. 하나는 오래 걸리는 계산 함수, 다른 하나는 대용량 데이터 파일 전달이다. 대용량 파일은 청크로 분할해 전달한다. server.py 코드는 다음과 같다. 

import json, time, logging
from fastapi import FastAPI, BackgroundTasks, WebSocket
from fastapi.middleware.cors import CORSMiddleware

# Set up logging to debug
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# uvicorn open_api_server:app --reload --port 8001 --ws-max-size 16777216   # https://www.uvicorn.org
app = FastAPI()

# CORS middleware. considering security
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],  # Allows all origins
allow_credentials=True,
allow_methods=["*"],  # Allows all methods
allow_headers=["*"],  # Allows all headers
)
def calculate_dataset(length):
logger.debug('Calculation started...')

time.sleep(30)  # 30 seconds
# TODO call your calculation functions
data = 3.14

logger.debug(f'End')
return data

@app.post("/v1/calc/dataset")
async def calculate_dataset(background_tasks: BackgroundTasks, length: str):
logger.debug('Calculation started...')
results = calculate_dataset(length)
return {"results": results}

@app.websocket("/ws/dataset") # don't remove /ws prefix to use websocket
async def websocket_endpoint(websocket: WebSocket):
logger.debug('Trying to connect...')
await websocket.accept()
logger.debug('Connection accepted.')
with open('output_xml.json') as json_file:
data = json.load(json_file)
data = json.dumps(data)

logger.debug('Message length: ' + str(len(data)))
CHUNK_SIZE = 64 * 1024  # 64KB
for i in range(0, len(data), CHUNK_SIZE):
chunk = data[i:i+CHUNK_SIZE]
print(f'chunk: {i}')
await websocket.send_text(chunk)
logger.debug('JSON data sent.')

다음과 같이 실행한다.
uvicorn open_api_server:app --reload --port 8001

클라이언트 개발
대용량 데이터와 결과를 클라이언트에서 처리할 때는 타임아웃 설정과 청크 다운로드 루프를 구현해야 한다. client.py를 코딩한다.
import json, traceback, asyncio, websockets, aiohttp # import httpx
from aiohttp import ClientSession, ClientTimeout

async def call_calc_dataset():
    params = {"length": "10"}
    t = ClientTimeout(total=60*2)  # 2 minutes
    async with aiohttp.ClientSession(timeout=t) as session:
        async with session.post('http://localhost:8001/v1/calc/dataset', params=params) as resp:
            results = await resp.text()
            print(results)

async def connect():
    async with websockets.connect('ws://localhost:8001/ws/dataset') as websocket:
        CHUNK_SIZE = 64 * 1024  # 64KB
        full_data = None
        received_count = 0
        try:
            while True:
                chunk = await asyncio.wait_for(websocket.recv(), timeout=5) # adjust timeout value considering internet speed
                if full_data is None:
                    full_data = chunk
                else:
                    full_data += chunk
                received_count += len(chunk) 
                print('Received data: ', received_count)
        except asyncio.TimeoutError:
            print("Timeout error: The server didn't respond within 5 seconds")
        except Exception as e:
            print(e)
            pass
        
        print('Received data: ', received_count)
        data = json.loads(full_data)
        try:
            os.remove('big_xml.json')
        except:
            pass
        with open('big_xml.json', 'w') as json_file:
            json.dump(data, json_file)
            print('JSON data saved to file.')

def main():
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(call_calc_dataset())
        loop.run_until_complete(connect())
    except Exception as e:
        print(traceback.format_exc())

if __name__ == '__main__':
    main()

다음과 같이 실행한다.
python client.py

서버와 클라이언트에서 실행 결과는 다음과 같다. 다음과 같으면 정상 동작되는 것이다. 


마무리
FastAPI는 Sebastián Ramírez라는 개발자에 의해 처음 개발되었다. 그는 현재 독일 베를린에 살고 있고, 많은 오픈소스 기여활동으로 유명하다.

레퍼런스

2024년 3월 30일 토요일

몬테카를로 시뮬레이션 기반 주가 예측

이 글은 몬테카를로 시뮬레이션 기반 주가 예측하는 방법을 보여준다. 

다음은 주가를 다운로드하여, 평균, 편차를 계산한 후, 95% 수준에서 시뮬레이션을 수행해, 수익률을 예측하는 코드이다. 
import numpy as np
import matplotlib.pyplot as plt
from eodhd import APIClient
import config as cfg

api = APIClient(cfg.API_KEY)


def get_ohlc_data():
    df = api.get_historical_data("GSPC.INDX", "d", results=365)
    return df


if __name__ == "__main__":
    df = get_ohlc_data()

    # Calculate daily returns
    daily_returns = df["adjusted_close"].pct_change().dropna()

    # Simulation parameters
    initial_investment = 10000  # Initial investment amount
    num_simulations = 1000  # Number of simulations
    forecast_days = 365  # Investment horizon in days
    desired_return = 0.10  # Desired return (10%)

    # Calculate the average daily return
    average_daily_return = daily_returns.mean()

    # Calculate volatility as the standard deviation of daily returns
    volatility = daily_returns.std()

    print(f"Average Daily Return: {average_daily_return}")
    print(f"Volatility: {volatility}")

    # Simulating future returns
    simulated_end_returns = np.zeros(num_simulations)
    for i in range(num_simulations):
        random_returns = np.random.normal(
            average_daily_return, volatility, forecast_days
        )
        cumulative_return = np.prod(1 + random_returns)
        simulated_end_returns[i] = initial_investment * cumulative_return

    # Calculate the final investment values
    final_investment_values = simulated_end_returns

    # Calculate Value at Risk (VaR) and Expected Tail Loss (Conditional VaR)
    confidence_level = 0.95
    sorted_returns = np.sort(final_investment_values)
    index_at_var = int((1 - confidence_level) * num_simulations)
    var = initial_investment - sorted_returns[index_at_var]
    conditional_var = initial_investment - sorted_returns[:index_at_var].mean()

    print(f"Value at Risk (95% confidence): £{var:,.2f}")
    print(f"Expected Tail Loss (Conditional VaR): £{conditional_var:,.2f}")

    num_success = np.sum(
        final_investment_values >= initial_investment * (1 + desired_return)
    )
    probability_of_success = num_success / num_simulations

    print(
        f"Probability of achieving at least a {desired_return*100}% return: {probability_of_success*100:.2f}%"
    )

    plt.figure(figsize=(10, 6))
    plt.hist(final_investment_values, bins=50, alpha=0.75)
    plt.axvline(
        initial_investment * (1 + desired_return),
        color="r",
        linestyle="dashed",
        linewidth=2,
    )
    plt.axvline(initial_investment - var, color="g", linestyle="dashed", linewidth=2)
    plt.title("Distribution of Final Investment Values")
    plt.xlabel("Final Investment Value")
    plt.ylabel("Frequency")
    plt.show()

    """
    # Plotting the results
    plt.figure(figsize=(10, 6))
    plt.plot(simulations.T, color="blue", alpha=0.025)
    plt.title("Monte Carlo Simulation of Future Prices")
    plt.xlabel("Day")
    plt.ylabel("Price")
    plt.show()
    """

레퍼런스

2024년 3월 22일 금요일

무료 LiDAR 포인트 클라우드 다운로드 웹사이트 소개

이 글은 무료 LiDAR(라이다) 포인트 클라우드 제공 다운로드 웹사이트를 소개한다. 

OpenTopography
OpenTopography는 지구 과학 연구와 교육을 위해 고해상도 지형 데이터를 액세스하고 활용할 수 있는 플랫폼이다. 이 웹사이트에서는 뉴질랜드의 라이다 데이터와 관련된 새로운 데이터셋도 제공하고 있다.

USGS Earth Explorer
USGS Earth Explorer는 위성, 항공기 및 기타 원격 감지 데이터를 검색, 발견 및 주문할 수 있는 웹 응용 프로그램이다. 지리 정보 시스템(GIS) 전문가, 연구원 및 교육자들이 이 웹사이트를 활용하여 지구 환경에 대한 데이터를 탐색하고 활용할 수 있다.

The United States Interagency Elevation Inventory
이 웹사이트는 미국 및 그 영토에 대한 고정밀 지형 및 해저 지형 데이터를 제공하는 국가 간 고도 인벤토리이다. 이러한 데이터는 연구, 환경 관리 및 재난 대응에 활용된다.

NOAA Digital Coast
NOAA Digital Coast는 연안 문제를 해결하기 위해 필요한 데이터, 도구 및 교육을 제공하는 웹 플랫폼이다. 연안 지역의 환경, 생태계 및 공간 정보를 탐색하고 활용할 수 있다.

National Ecological Observatory Network (NEON)
NEON은 미국 내의 다양한 위치에서 생태학적 및 기후학적 관측을 수행하는 네트워크이다. 이러한 데이터는 지속적인 연구와 교육에 활용된다.

Equator LIDAR Data Online
Equator는 LIDAR 데이터를 제공하는 플랫폼이다. 이 웹사이트에서는 다양한 지역의 지형 데이터를 탐색하고 활용할 수 있다.

레퍼런스

2024년 3월 21일 목요일

허깅페이스 트랜스포머 라이브러리 사용법 총정리

이 글은 그 동안 틈틈히 사용했었던 허깅페이스 트랜스포머 라이브러리 사용법을 총정리해본다. 이 글은 QA 시스템, 문장 분류, 토큰 의미 예측 방법 등을 소개한다. 이 글은 생성AI의 핵심 아키텍처 모델인 트랜스포머를 잘 이용하고 싶은 개발자, 연구자, 학생들에게 도움이 된다. 

만약, 트랜스포머의 내부 동작 메커니즘이 궁금하다면, 다음 링크를 참고한다.

소개
허깅페이스는 트랜스포머 라이브러리를 개발하여, 관련 최신 모델을 다운로드하고, 쉽게 학습 및 사용할 수 있는 API 도구를 제공한다. 

트랜스포머를 통해 다음 기능을 구현할 수 있다. 
훈련 데이터셋 다운로드 제공 예(yelp_review_full · Datasets at Hugging Face)


지원되는 모델
아래 표를 보면 알겠지만, LLM(Large Language Model) 뿐 아니라 생성AI와 관련된 모든 사전 학습된 모델을 제공한다(상세 참고). 
라이브러리 설치 방법
이 라이브러리 사용을 위해서는 NVIDIA GPU, CUDA, PyTorch, Tensorflow 사전 설치가 필요하다. 
명령행 터미널을 실행하고, 가상환경에서 다음을 입력해, 라이브러리를 설치한다.
pip install transformers[torch] datasets evaluate

설치하면, 추론 inference 등에 사용할 수 있는 파이프라인 객체를 사용할 수 있다. 파이프라인은 대부분 학습된 모델의 복잡성을 간소화한 API를 제공한다. 인식, 마스킹 언어 모델링, 특징 추출이 가능하다. 다음 코드는 그 예이다. 
import transformers, torch
from transformers import pipeline

print(transformers.__version__)
print(torch.cuda.is_available())

pipe = pipeline("text-classification", model="nlptown/bert-base-multilingual-uncased-sentiment", device=0)
msg = pipe("This restaurant is awesome")
print(msg)

다음과 같이 출력되면 성공한 것이다. 

트러블슈팅
현재 시점에서 텐서플로우 GPU버전은 패키지 의존성 에러가 발생한다. 참고로, pipeline에서 model과 device=0를 명시하면, 파이토치 GPU 버전으로 실행된다. 

방화벽으로 인해 오프라인 환경에서 실행해야 한다면 다음과 같이 설정 후 실행한다.
HF_DATASETS_OFFLINE=1 TRANSFORMERS_OFFLINE=1 \
python examples/pytorch/translation/run_translation.py --model_name_or_path google-t5/t5-small --dataset_name wmt16 --dataset_config ro-en ...

질의 응답 자료 학습
ChatGPT와 같이 질의 응답 자료를 학습 해본다. 학습된 모델은 허깅페이스의 허브에 저장해 놓고 재활용하기로 한다. 이를 위해, 다음과 같이 허깅페이스 토큰을 생성한다. 


이제, 다음과 같이 하습 데이터셋 다운로드 코딩한다. 
from huggingface_hub import notebook_login  # 허깅페이스 모델 업로드 위한 임포트
notebook_login()

from datasets import load_dataset  # squad 데이터셋 다운로드
squad = load_dataset("squad", split="train[:5000]")
squad = squad.train_test_split(test_size=0.2)
print(squad['train'][0])

다음과 같이 토크나이저에 훈련 텍스트 입력 처리를 위해, 최대 길이를 절단하는 등의 처리 함수를 코딩한다.
from transformers import AutoTokenizer  #  distilbert 토큰나이저 임포트
tokenizer = AutoTokenizer.from_pretrained("distilbert/distilbert-base-uncased") 

def preprocess_function(examples):
    questions = [q.strip() for q in examples["question"]]
    inputs = tokenizer(
        questions,
        examples["context"],
        max_length=384,
        truncation="only_second",
        return_offsets_mapping=True,
        padding="max_length",
    )

    offset_mapping = inputs.pop("offset_mapping")
    answers = examples["answers"]
    start_positions = []
    end_positions = []

    for i, offset in enumerate(offset_mapping):
        answer = answers[i]
        start_char = answer["answer_start"][0]
        end_char = answer["answer_start"][0] + len(answer["text"][0])
        sequence_ids = inputs.sequence_ids(i)

        # Find the start and end of the context
        idx = 0
        while sequence_ids[idx] != 1:
            idx += 1
        context_start = idx
        while sequence_ids[idx] == 1:
            idx += 1
        context_end = idx - 1

        # If the answer is not fully inside the context, label it (0, 0)
        if offset[context_start][0] > end_char or offset[context_end][1] < start_char:
            start_positions.append(0)
            end_positions.append(0)
        else:
            # Otherwise it's the start and end token positions
            idx = context_start
            while idx <= context_end and offset[idx][0] <= start_char:
                idx += 1
            start_positions.append(idx - 1)

            idx = context_end
            while idx >= context_start and offset[idx][1] >= end_char:
                idx -= 1
            end_positions.append(idx + 1)

    inputs["start_positions"] = start_positions
    inputs["end_positions"] = end_positions
    return inputs

데이터셋을 앞의 정의된 함수로 전처리한다. 필요하지 않은 열은 제거한다. 
tokenized_squad = squad.map(preprocess_function, batched=True, remove_columns=squad["train"].column_names)

학습을 위해 배치를 만든다. 
from transformers import DefaultDataCollator
data_collator = DefaultDataCollator()

from transformers import AutoModelForQuestionAnswering, TrainingArguments, Trainer
model = AutoModelForQuestionAnswering.from_pretrained("distilbert/distilbert-base-uncased")  # 사전 학습 모델을 파인튜닝한다

training_args = TrainingArguments(
    output_dir="my_awesome_qa_model",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    push_to_hub=True,
)  # 학습될 모델 및 파라메터 명시

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_squad["train"],
    eval_dataset=tokenized_squad["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
)  # 전이학습모델, 파라메터, 데이터셋, 토크나이져, 배치처리를 위한 데이터 컬렉터 설정

trainer.train()  # 학습
trainer.push_to_hub()  # 학습된 데이터를 허브에 업로드

그 결과 다음과 같이 파인 튜닝된 학습 모델이 허브에 업로드되었다. 

다음과 같이 실행한다.
from transformers import pipeline

question = "How many programming languages does BLOOM support?"
context = "BLOOM has 176 billion parameters and can generate text in 46 languages natural languages and 13 programming languages."

question_answerer = pipeline("question-answering", model="my_awesome_qa_model")
question_answerer(question=question, context=context)

결과가 다음과 같다면 성공한 것이다.

NER(Named Entity Recognition) BERT 모델 파인튜닝
각 문장의 토큰 엔티티 의미를 인식하는 모델을 NER이라 한다. 문장 토큰의 의미를 인식할 수 있다면, 문장에서 특정 토큰에 대한 의미를 알고, 해당 의미에 대한 에이전트를 자동 실행하는 등의 서비스를 쉽게 개발할 수 있다. 

코드 개발 전에 다음을 설치한다. 
pip install transformers[torch] datasets

이 예제는 NER 데이터셋을 다운로드하고, BERT-BASE-NER(참고) 사전학습모델을 이용해 파인튜닝한다. BERT-BASE-NER 모델 사용방법은 다음과 같다. 
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import pipeline

tokenizer = AutoTokenizer.from_pretrained("dslim/bert-base-NER")
model = AutoModelForTokenClassification.from_pretrained("dslim/bert-base-NER")

nlp = pipeline("ner", model=model, tokenizer=tokenizer)
example = "My name is Wolfgang and I live in Berlin"

ner_results = nlp(example)
print(ner_results)

파인튜닝 데이터셋은 희귀한 토큰 의미를 정의한 데이터셋이다. 
NER 의미는 다음과 같다. 
Person
Location (including GPE, facility)
Corporation
Consumer good (tangible goods, or well-defined services)
Creative work (song, movie, book, and so on)
Group (subsuming music band, sports team, and non-corporate organisations)

이는 코드 상해 좀 더 상세화되어 표현되었다.
0: O
1: B-corporation
2: I-corporation
3: B-creative-work
4: I-creative-work
5: B-group
6: I-group
7: B-location
8: I-location
9: B-person
10: I-person
11: B-product
12: I-product

각 B, I, O는 Begin location, Inside location, Outside를 의미한다(NER 연구에 자주 나오는 용어). 참고로, New York City is a bustling metropolis의 NER 해석은 다음과 같다. 
New: O (Outside of any named entity)
York: B-LOC (Beginning of a Location entity)
City: I-LOC (Inside of a Location entity)
is: O
a: O
bustling: O
metropolis: O 

사용된 데이터셋은 다음과 같이 문장 토큰에 대한 NER이 정의되어 있다. 

이제, 다음 파인튜닝 코드를 입력해 실행, 학습한다. 
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("dslim/bert-base-NER")
def tokenize_and_align_tags(records):
    # 입력 단어를 토큰으로 분리함. 예를 들어, ChatGPT경우, ['Chat', '##G', '##PT']로 분해
    tokenized_results = tokenizer(records["tokens"], truncation=True, is_split_into_words=True)
    input_tags_list = []

    for i, given_tags in enumerate(records["ner_tags"]):
        word_ids = tokenized_results.word_ids(batch_index=i)
        previous_word_id = None
        input_tags = []

        for wid in word_ids:
            if wid is None:
                input_tags.append(-100)
            elif wid != previous_word_id:
                input_tags.append(given_tags[wid])
            else:
                input_tags.append(-100)
            previous_word_id = wid

        input_tags_list.append(input_tags)

    tokenized_results["labels"] = input_tags_list
    return tokenized_results

from datasets import load_dataset
wnut = load_dataset('wnut_17')
tokenized_wnut = wnut.map(tokenize_and_align_tags, batched=True)
tag_names = wnut["test"].features[f"ner_tags"].feature.names
id2label = dict(enumerate(tag_names))
label2id = dict(zip(id2label.values(), id2label.keys()))

from transformers import AutoModelForTokenClassification
model = AutoModelForTokenClassification.from_pretrained(
    "dslim/bert-base-NER", num_labels=len(id2label), id2label=id2label, label2id=label2id, ignore_mismatched_sizes=True
)

from transformers import Trainer, TrainingArguments, DataCollatorForTokenClassification
training_args = TrainingArguments(
    output_dir="my_finetuned_wnut_model",
)
''' # 하이퍼파라메터 튜닝용
training_args = TrainingArguments(
    output_dir="my_finetuned_wnut_model_1012",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=2,
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    push_to_hub=True,
)'''
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)     
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_wnut["train"],
    eval_dataset=tokenized_wnut["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
)

trainer.train()

from transformers import pipeline
model = AutoModelForTokenClassification.from_pretrained("my_finetuned_wnut_model/checkpoint-1000")
tokenizer = AutoTokenizer.from_pretrained("my_finetuned_wnut_model/checkpoint-1000")

classifier = pipeline("ner", model=model, tokenizer=tokenizer)
out = classifier("My name is Sylvain and I work at Hugging Face in Brooklyn.")
print(out)

import evaluate
seqeval = evaluate.load("seqeval")
results = trainer.evaluate()
print(results)
trainer.push_to_hub()

실행 결과는 다음과 같다. 

다음과 같이 문장 각 토큰의 의미(예. 사람, 위치 등)를 높은 정확도로 제공해 준다. 

마무리
이 글은 허깅페이스 트랜스포머를 이용해 다양한 사전학습 모델을 이용해 여러가지 서비스 개발 방법을 살펴보았다. 자체 데이터로 학습하거나, 데이터 학습을 좀 더 많이 할 수록 결과가 더 정확하고, 전문적이며, 풍부해 질 것이다.

레퍼런스