Building on AWS

12. 생성형 AI와 현대적 데이터 아키텍처 통합 및 SQL·API 쿼리 프롬프트 활용

Window to the world 2025. 1. 16. 13:46
반응형

Modern Data Architectures와 Generative AI 통합

 

본 글에서는 대형 언어 모델(LLM)을 사용하여 AWS 데이터베이스, 데이터 스토어, 및 데이터 웨어하우징 솔루션(예: Amazon Athena)과 상호작용하는 방법을 설명합니다. 본 설명에서 제공하는 방법을 통해 SQL 쿼리를 생성하고 실행하거나 API 엔드포인트에 요청을 보냄으로써 데이터를 처리합니다.

 


1. 라이브러리 설치

 

SQLAlchemy는 Python에서 SQL 데이터베이스와 상호작용하기 위한 ORM(Object Relational Mapping) 라이브러리입니다.

  • 데이터베이스 연결, SQL 쿼리 작성, 스키마 정의 등을 쉽게 수행할 수 있습니다.

LangChain은 대형 언어 모델(LLM)과 외부 데이터 소스를 연결하는 Python 라이브러리입니다.

  • LLM 활용, 문서 처리, 에이전트 실행 등의 기능을 제공합니다.

LangChain Experimental은 LangChain의 실험적 기능을 포함하는 확장 라이브러리입니다.

  • 최신 기술 또는 기능을 테스트하거나 활용할 때 사용됩니다.

PyAthena는 Python에서 Amazon Athena와 상호작용하기 위한 클라이언트 라이브러리입니다.

  • [SQLAlchemy] 옵션은 SQLAlchemy와 통합하여 Athena를 사용할 수 있도록 설정합니다.
  • Athena를 통해 S3에 저장된 데이터를 SQL 쿼리로 분석할 수 있습니다.

LangChain-AWS는 LangChain과 AWS 서비스를 통합하기 위한 라이브러리입니다.

  • AWS 서비스(예: S3, Athena, Glue, Bedrock)와 LangChain의 기능을 결합하여 데이터 처리를 자동화합니다.
%%capture
# 설치 과정에서 발생하는 로그나 경고를 표시하지 않도록 설정합니다.
!pip install sqlalchemy==2.0.29
!pip install langchain==0.1.19
!pip install langchain-experimental==0.0.58
!pip install PyAthena[SQLAlchemy]==3.8.2
!pip install -U langchain-aws==0.1.3

import os
import json
import boto3

import sqlalchemy
from sqlalchemy import create_engine

from langchain.docstore.document import Document
from langchain import PromptTemplate,SagemakerEndpoint,SQLDatabase,LLMChain
from langchain_experimental.sql import SQLDatabaseChain, SQLDatabaseSequentialChain
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts.prompt import PromptTemplate

from langchain.chains.api.prompt import API_RESPONSE_PROMPT
from langchain.chains import APIChain

from typing import Dict
import time
from langchain_aws import BedrockLLM

2. CloudFormation 구성

 

CloudFormation 템플릿은 S3 버킷과 IAM 역할을 생성하고, S3 버킷 이름, SageMaker Studio URL, 데이터베이스 및 크롤러 이름 등 주요 리소스 정보를 Outputs로 제공하여 AWS 리소스를 자동화하고 쉽게 관리할 수 있도록 설계되었습니다.

 

AWSTemplateFormatVersion: '2010-09-09'
Description: Example CloudFormation stack to demonstrate Outputs.

Resources:
  ExampleS3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub "example-bucket-${AWS::StackName}"

  ExampleIAMRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "example-role-${AWS::StackName}"
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: ec2.amazonaws.com
            Action: sts:AssumeRole

Outputs:
  labfilesbucket:
    Description: The name of the lab files S3 bucket.
    Value: !Sub "lab-files-${AWS::AccountId}-c9120d50"

  SageMakerStudioURL:
    Description: URL for SageMaker Studio.
    Value: !Sub "https://${AWS::Region}.console.aws.amazon.com/sagemaker/home?region=${AWS::Region}#/studio/open/d-fttc4rb26cct/default-c9120d50"

  LibraryDatabaseName:
    Description: Name of the library database.
    Value: "s3_library"

  LabfilesBucketName:
    Description: Name of the lab files bucket.
    Value: !Sub "lab-files-${AWS::AccountId}-c9120d50"

  CarsDatabaseName:
    Description: Name of the cars database.
    Value: "s3_cars"

  LibraryCrawlerName:
    Description: Name of the library crawler.
    Value: "s3_library_crawler"

  CarsCrawlerName:
    Description: Name of the cars crawler.
    Value: "s3_cars_crawler"

  ModelId:
    Description: ID of the model used.
    Value: "amazon.titan-text-lite-v1"

 

 

## CloudFormation stack name은 위 코드를 저장한 이름으로 수정
cloudformation_stack_name = '클라우드 포메이션 이름'
cfn_client = boto3.client('cloudformation')

def get_cfn_outputs(cloudformation_stack_name):
    outputs = {}
    for output in cfn_client.describe_stacks(StackName=cloudformation_stack_name)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

outputs = get_cfn_outputs(cloudformation_stack_name)
json_formatted_str = json.dumps(outputs, indent=2)
print(json_formatted_str)

 

3. AWS Glue Data Catalog & Crawler 구성

 

AWS Glue Data Catalog 및 크롤러와 관련된 변수를 정의하고, 일부는 출력(outputs)에서 가져옵니다. 이를 통해 AWS S3 버킷과 Glue 서비스 간의 데이터 처리를 설정합니다.

data_file = 's3_library_data.json'
lab_files_folder = 'library-data'
glue_db_name = outputs['LibraryDatabaseName']
glue_crawler_name = outputs['LibraryCrawlerName']

lab_files_bucket = outputs['LabfilesBucketName']

* 본 글에 사용한 데이터는 도서관 도서 정보 데이터셋으로 book_id, title, author, genre, pub_date 로 구성되어 있습니다.

 

4. 데이터 스테이징

 

AWS CLI 명령어를 사용하여 라이브러리 JSON 데이터 파일을 S3 버킷의 지정된 폴더에 업로드 합니다

!aws s3 cp {data_file} s3://{lab_files_bucket}/{lab_files_folder}/

 

5. AWS Glue Crawler 실행

 

AWS Glue 크롤러를 시작하고 크롤러의 상태를 주기적으로 확인하여 데이터 스키마를 탐지하는 작업의 성공 여부를 모니터링합니다.

client = boto3.client('glue')

print("About to start running the crawler: ", glue_crawler_name)

try:
    response = client.start_crawler(Name=glue_crawler_name )
    print("Successfully started crawler. The crawler may take 2-5 mins to detect the schema.")
    while True:
        # Get the crawler status.
        response = client.get_crawler(Name=glue_crawler_name)
         # Extract the crawler state.
        status = response['Crawler']['State']
        # Print the crawler status.
        print(f"Crawler '{glue_crawler_name}' status: {status}")
        if status == 'STOPPING':  # Replace 'READY' with the desired completed state.
            break  # Exit the loop if the desired state is reached.

        time.sleep(10)  # Sleep for 10 seconds before checking the status again.
    
except:
    print("error in starting crawler. Check the logs for the error details.")

 

결과 예시:

 

6. AWS Athena 데이터베이스 연결 설정

 

SQLAlchemy를 사용하여 AWS Athena 데이터베이스와 연결을 설정하고, LangChain에서 사용할 수 있도록 SQLAlchemy 엔진을 생성합니다.

# AWS Region 설정
region = boto3.session.Session().region_name

## Athena variables
connathena=f"athena.{region}.amazonaws.com" 
portathena='443'                                         
schemaathena=glue_db_name                                
s3stagingathena=f's3://{lab_files_bucket}/athenaresults/' 
wkgrpathena='primary'                                    

connection_string = f"awsathena+rest://@{connathena}:{portathena}/{schemaathena}?s3_staging_dir={s3stagingathena}/&work_group={wkgrpathena}"

## Athena SQLAlchemy engine 생성
engine_athena = create_engine(connection_string, echo=False)
dbathena = SQLDatabase(engine_athena)

gdc = [schemaathena]
print("Connection to Athena database succeeded: ", gdc[0])

 

결과 예시:

 

7. AWS Glue Data Catalog 메타데이터 수집 및 동적 프롬프트 생성

 

AWS Glue Data Catalog에서 메타데이터를 수집하여 파이프(|)로 구분된 형식으로 반환합니다. 데이터베이스, 테이블, 열 정보를 동적으로 추출하고, 이를 기반으로 AWS Glue 카탈로그의 종합 뷰를 생성합니다

 

# AWS Glue Data Catalog로 dynamic prompts 생성
# AWS Glue crawler metadata 수집

def parse_catalog():    
    columns_str = 'database|table|column_name'
        
    glue_client = boto3.client('glue')
    
    for db in gdc:
        response = glue_client.get_tables(DatabaseName =db)
        for tables in response['TableList']:
            # Classification in the response for S3 and other databases is different. Set classification based on the response location.
            if tables['StorageDescriptor']['Location'].startswith('s3'):  classification='s3' 
            else:  classification = tables['Parameters']['classification']
            for columns in tables['StorageDescriptor']['Columns']:
                    dbname,tblname,colname=tables['DatabaseName'],tables['Name'],columns['Name']
                    columns_str = columns_str+f'\n{dbname}|{tblname}|{colname}'
    return columns_str

glue_catalog = parse_catalog()

print(glue_catalog)

 

결과 예시:

 

8. 사용자 질문에 적합한 데이터 채널 결정 및 SQL 생성

 

AWS Bedrock의 Amazon Titan 모델을 사용하여, 자연어 사용자 질의를 기반으로 적절한 데이터 채널을 식별하고, 이에 맞는 SQL 쿼리를 생성합니다. 

# Bedrock LLM model 설정

bedrock_model = 'amazon.titan-text-lite-v1'

BEDROCK_CLIENT = boto3.client("bedrock-runtime", 'us-east-1')

if bedrock_model == 'amazon.titan-text-lite-v1':
    inference_modifier = {}
else:
    inference_modifier = {"temperature":0.0, "max_tokens":50}

llm = BedrockLLM(model_id=bedrock_model, client=BEDROCK_CLIENT, model_kwargs = inference_modifier)

def identify_channel(query):
    prompt_template_titan = """You are a SQL expert. Convert the below natural language question into a valid SQL statement. The schema has the structure below:\n
     """+glue_catalog+""" 
     \n
     Here is the question to be answered:\n
     {query}
     \n
     Provide the SQL query that would retrieve the data based on the natural language request.\n
     
     """
    
    prompt_template = prompt_template_titan
    
    
    ## prompt 1 정의
    PROMPT_channel = PromptTemplate(template=prompt_template, input_variables=["query"])
    
    # LLM chain 정의
    llm_chain = LLMChain(prompt=PROMPT_channel, llm=llm)
        
    generated_texts = llm_chain.run(query)
    
    # 데이터 채널 설정
    if 's3' in generated_texts: 
            channel='db'
            db=dbathena
            print("SET database to athena")
    elif 'api' in generated_texts: 
            channel='api'
            print("SET database to weather api")        
    else: raise Exception("User question cannot be answered by any of the channels mentioned in the catalog")
    
    # print("Step complete. Channel is: ", channel)
    
    return channel, db
    
def run_query(query):

    channel, db = identify_channel(query) # Call the identify channel function first.

    _DEFAULT_TEMPLATE = """
    Here is a schema of a table:
    <schema>
    {table_info}
    </schema>       
    Run a SQL query to answer the question. Follow this format:
    
    SQLQuery: the correct SQL query. For example: select count ( * )  from s3_library_data where genre = 'Novel'
    SQLResult: the result of the SQL query.
    Answer: convert the SQLResult to a grammatically correct sentence.
    
    Here is question: {input}"""
    
    PROMPT_sql = PromptTemplate(
        input_variables=["table_info","input"], template=_DEFAULT_TEMPLATE
    )

    
    if channel=='db':
        db_chain = SQLDatabaseChain.from_llm(llm, db, prompt=PROMPT_sql, verbose=True, return_intermediate_steps=False)
        response=db_chain.run(query)
    else: raise Exception("Unlisted channel. Check your unified catalog")
    return response

 

9. 사용자 질문 SQL 쿼리 변환 및 결과 생

 

run_query 함수를 호출하여 사용자의 자연어 질의를 SQL 쿼리로 변환하고, 해당 쿼리를 데이터 소스(Athena 또는 API)에서 실행한 후 결과를 반환합니다.

# query = """How many books with a genre of Fantasy are in the library?""" 
# query = """Find 3 books in the library with Tarzan in the title?""" 
# query = """How many books by author Stephen King are in the library?""" 
query = """How many total books are there in the library?""" 

response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response}')

 

결과 예시:

 

 

Written in July 5, 2024 

by Sang Hyun Jo

반응형