RESUMO
Como Integrar ChatGPT e OpenAI API em Aplicações Web
Guia completo para desenvolvedores integrarem IA em suas aplicações com Python e JavaScript
Keywords: OpenAI API, Python, JavaScript
ÍNDICE
1. Por Que Integrar OpenAI API em 2026
2. Configuração Inicial e Autenticação
3. Implementação com Python
4. Implementação com JavaScript
5. Tratamento de Erros e Rate Limiting
6. Segurança e Melhores Práticas
7. Casos de Uso Práticos
CONTEXTO
Por Que Integrar OpenAI API em 2026
A integração de inteligência artificial em aplicações web deixou de ser um diferencial para se tornar uma necessidade em 2026. Com a OpenAI API processando mais de 100 milhões de requisições diárias e o GPT-4 Turbo oferecendo capacidades avançadas de processamento de linguagem natural, desenvolvedores têm acesso a ferramentas poderosas para criar experiências inteligentes.
O mercado de aplicações baseadas em IA cresceu 340% nos últimos dois anos, com empresas reportando aumentos de 25% na satisfação do usuário após implementar chatbots inteligentes e assistentes virtuais. Este guia demonstra como integrar essas capacidades de forma prática e segura.
PONTO-CHAVE
A API da OpenAI oferece diferentes modelos com custos variando de $0.0015 por 1K tokens (GPT-3.5-turbo) até $0.03 por 1K tokens (GPT-4), permitindo escolher o melhor custo-benefício para sua aplicação.
Vantagens da Integração OpenAI
Escalabilidade — Processa milhares de requisições simultâneas
Versatilidade — Suporte a texto, código, análise e tradução
Confiabilidade — Uptime de 99.9% com infraestrutura robusta
Atualizações Constantes — Melhorias automáticas de modelo sem alteração de código
CONFIGURAÇÃO
Configuração Inicial e Autenticação
Antes de começar a integração, você precisa obter suas credenciais da OpenAI e configurar o ambiente de desenvolvimento. O processo envolve criar uma conta, gerar uma chave de API e configurar os limites de uso adequados para sua aplicação.
Obtendo a Chave de API
Acesse platform.openai.com e siga estes passos:
1
Criar Conta e Verificar
Registre-se na plataforma e confirme seu email. A OpenAI oferece $5 em créditos gratuitos para novos usuários.
2
Gerar Chave de API
Navegue até API Keys e clique em “Create new secret key”. Guarde a chave com segurança – ela não será exibida novamente.
3
Configurar Limites de Uso
Estabeleça limites mensais de gasto para evitar custos inesperados. Recomenda-se começar com $50/mês para testes.

PONTO-CHAVE
Nunca commit sua chave de API no controle de versão. Use variáveis de ambiente ou serviços de gerenciamento de segredos como AWS Secrets Manager ou Azure Key Vault.
Configuração de Ambiente
Para manter suas credenciais seguras, configure variáveis de ambiente em diferentes ambientes de desenvolvimento:
EXPLICAÇÃO DO CÓDIGO
Este exemplo mostra como configurar variáveis de ambiente para diferentes sistemas operacionais.
# .env file (desenvolvimento local)
OPENAI_API_KEY=sk-your-api-key-here
OPENAI_ORG_ID=org-your-organization-id
# Linux/Mac (terminal)
export OPENAI_API_KEY="sk-your-api-key-here"
export OPENAI_ORG_ID="org-your-organization-id"
# Windows (PowerShell)
$env:OPENAI_API_KEY="sk-your-api-key-here"
$env:OPENAI_ORG_ID="org-your-organization-id"PYTHON
Implementação com Python
Python oferece a biblioteca oficial openai que simplifica significativamente a integração com a API. Esta seção demonstra desde a instalação básica até implementações avançadas com tratamento de erros e otimizações.
Instalação e Configuração Básica
EXPLICAÇÃO DO CÓDIGO
Instalação da biblioteca oficial da OpenAI e configuração inicial para começar a fazer requisições à API.
# Instalação da biblioteca
pip install openai python-dotenv
# requirements.txt
openai==1.10.0
python-dotenv==1.0.0
requests==2.31.0EXPLICAÇÃO DO CÓDIGO
Configuração básica do cliente OpenAI com carregamento seguro de variáveis de ambiente.
import os
from dotenv import load_dotenv
from openai import OpenAI
# Carrega variáveis de ambiente
load_dotenv()
# Inicializa o cliente OpenAI
client = OpenAI(
api_key=os.getenv('OPENAI_API_KEY'),
organization=os.getenv('OPENAI_ORG_ID') # Opcional
)
def test_connection():
"""Testa a conexão com a API"""
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello, World!"}],
max_tokens=50
)
print("✅ Conexão estabelecida com sucesso!")
print(f"Resposta: {response.choices[0].message.content}")
return True
except Exception as e:
print(f"❌ Erro na conexão: {e}")
return False
if __name__ == "__main__":
test_connection()Implementação de Chatbot Básico
Vamos criar um chatbot simples que mantém contexto da conversa e oferece respostas inteligentes. Este exemplo demonstra como implementar um sistema de mensagens com histórico e configurações otimizadas.
EXPLICAÇÃO DO CÓDIGO
Classe ChatBot que gerencia conversas, mantém histórico e implementa configurações personalizadas para diferentes tipos de assistentes.
import json
from typing import List, Dict, Optional
from datetime import datetime
class ChatBot:
def __init__(self, model: str = "gpt-3.5-turbo", system_prompt: str = None):
self.client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
self.model = model
self.conversation_history: List[Dict] = []
self.system_prompt = system_prompt or "Você é um assistente útil e amigável."
# Adiciona prompt do sistema
self.conversation_history.append({
"role": "system",
"content": self.system_prompt
})
def send_message(self, message: str, temperature: float = 0.7) -> Dict:
"""Envia mensagem e retorna resposta com metadata"""
try:
# Adiciona mensagem do usuário ao histórico
self.conversation_history.append({
"role": "user",
"content": message,
"timestamp": datetime.now().isoformat()
})
# Limita histórico para evitar excesso de tokens
if len(self.conversation_history) > 20:
# Mantém system prompt + últimas 18 mensagens
self.conversation_history = [
self.conversation_history[0] # system prompt
] + self.conversation_history[-18:]
# Faz a requisição para a API
response = self.client.chat.completions.create(
model=self.model,
messages=self.conversation_history,
temperature=temperature,
max_tokens=500,
presence_penalty=0.1,
frequency_penalty=0.1
)
# Extrai resposta
ai_message = response.choices[0].message.content
# Adiciona resposta ao histórico
self.conversation_history.append({
"role": "assistant",
"content": ai_message,
"timestamp": datetime.now().isoformat()
})
return {
"message": ai_message,
"tokens_used": response.usage.total_tokens,
"model": response.model,
"finish_reason": response.choices[0].finish_reason,
"cost_estimate": self.calculate_cost(response.usage.total_tokens)
}
except Exception as e:
return {
"error": str(e),
"message": "Desculpe, ocorreu um erro ao processar sua mensagem."
}
def calculate_cost(self, tokens: int) -> float:
"""Calcula custo estimado baseado no modelo"""
rates = {
"gpt-3.5-turbo": 0.0015 / 1000, # $0.0015 per 1K tokens
"gpt-4": 0.03 / 1000, # $0.03 per 1K tokens
"gpt-4-turbo": 0.01 / 1000 # $0.01 per 1K tokens
}
return tokens * rates.get(self.model, 0.002)
def get_conversation_summary(self) -> Dict:
"""Retorna resumo da conversa"""
user_messages = len([m for m in self.conversation_history if m["role"] == "user"])
ai_messages = len([m for m in self.conversation_history if m["role"] == "assistant"])
return {
"total_exchanges": min(user_messages, ai_messages),
"user_messages": user_messages,
"ai_messages": ai_messages,
"model": self.model
}
def export_conversation(self, filepath: str):
"""Exporta conversa para arquivo JSON"""
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(self.conversation_history, f, ensure_ascii=False, indent=2)
# Exemplo de uso
if __name__ == "__main__":
# Cria chatbot especializado em programação
programming_bot = ChatBot(
model="gpt-4-turbo",
system_prompt="""Você é um especialista em programação Python.
Forneça código limpo, explicações claras e melhores práticas."""
)
# Simula conversa
response = programming_bot.send_message(
"Como implementar cache em uma API Flask?"
)
print(f"Resposta: {response['message']}")
print(f"Tokens utilizados: {response['tokens_used']}")
print(f"Custo estimado: ${response['cost_estimate']:.4f}")
# Exporta conversa
programming_bot.export_conversation("conversation_log.json")
PONTO-CHAVE
O GPT-4 Turbo oferece janela de contexto de 128K tokens (aproximadamente 300 páginas de texto), permitindo conversas muito mais longas e análise de documentos extensos.
API Flask para Integração Web
Para disponibilizar as funcionalidades do ChatGPT em uma aplicação web, precisamos criar uma API REST que gerencie as requisições de forma eficiente e segura.
EXPLICAÇÃO DO CÓDIGO
API Flask completa com endpoints para chat, autenticação, rate limiting e monitoramento de uso.
from flask import Flask, request, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from functools import wraps
import redis
import hashlib
import time
app = Flask(__name__)
# Configuração do Redis para cache e rate limiting
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# Rate Limiter
limiter = Limiter(
app,
key_func=get_remote_address,
storage_uri="redis://localhost:6379"
)
def require_api_key(f):
"""Decorator para verificar API key"""
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key or not validate_api_key(api_key):
return jsonify({'error': 'Invalid or missing API key'}), 401
return f(*args, **kwargs)
return decorated_function
def validate_api_key(api_key: str) -> bool:
"""Valida API key (implementar conforme necessário)"""
# Aqui você implementaria a validação real
valid_keys = os.getenv('VALID_API_KEYS', '').split(',')
return api_key in valid_keys
def get_cached_response(cache_key: str) -> Optional[str]:
"""Busca resposta em cache"""
try:
cached = redis_client.get(cache_key)
return cached.decode('utf-8') if cached else None
except:
return None
def cache_response(cache_key: str, response: str, ttl: int = 3600):
"""Armazena resposta em cache"""
try:
redis_client.setex(cache_key, ttl, response)
except:
pass
@app.route('/api/chat', methods=['POST'])
@limiter.limit("100 per hour") # Limite de 100 requests por hora
@require_api_key
def chat():
"""Endpoint principal para chat"""
try:
data = request.get_json()
# Validação de dados
if not data or 'message' not in data:
return jsonify({'error': 'Message is required'}), 400
message = data['message']
model = data.get('model', 'gpt-3.5-turbo')
temperature = data.get('temperature', 0.7)
max_tokens = min(data.get('max_tokens', 500), 1000) # Limite máximo
# Gera chave de cache baseada no conteúdo
cache_key = hashlib.md5(f"{message}:{model}:{temperature}".encode()).hexdigest()
# Verifica cache primeiro
cached_response = get_cached_response(cache_key)
if cached_response:
response_data = json.loads(cached_response)
response_data['cached'] = True
return jsonify(response_data)
# Inicializa bot e faz requisição
bot = ChatBot(model=model)
result = bot.send_message(message, temperature)
if 'error' in result:
return jsonify(result), 500
# Prepara resposta
response_data = {
'message': result['message'],
'tokens_used': result['tokens_used'],
'model': result['model'],
'cost_estimate': result['cost_estimate'],
'cached': False,
'timestamp': datetime.now().isoformat()
}
# Armazena em cache apenas respostas bem-sucedidas
cache_response(cache_key, json.dumps(response_data))
return jsonify(response_data)
except Exception as e:
app.logger.error(f"Chat error: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/models', methods=['GET'])
@require_api_key
def get_available_models():
"""Lista modelos disponíveis"""
models = [
{
'id': 'gpt-3.5-turbo',
'name': 'GPT-3.5 Turbo',
'description': 'Rápido e econômico para a maioria dos casos',
'cost_per_1k_tokens': 0.0015,
'context_window': 4096
},
{
'id': 'gpt-4',
'name': 'GPT-4',
'description': 'Mais preciso para tarefas complexas',
'cost_per_1k_tokens': 0.03,
'context_window': 8192
},
{
'id': 'gpt-4-turbo',
'name': 'GPT-4 Turbo',
'description': 'Melhor custo-benefício com janela expandida',
'cost_per_1k_tokens': 0.01,
'context_window': 128000
}
]
return jsonify({'models': models})
@app.route('/api/usage', methods=['GET'])
@require_api_key
def get_usage_stats():
"""Estatísticas de uso da API"""
api_key = request.headers.get('X-API-Key')
# Busca estatísticas do Redis (implementar conforme necessário)
stats = {
'requests_today': 0,
'tokens_used_today': 0,
'cost_today': 0.0,
'rate_limit_remaining': 100, # Implementar lógica real
'last_request': None
}
return jsonify(stats)
@app.errorhandler(429)
def ratelimit_handler(e):
"""Handler para rate limit excedido"""
return jsonify({
'error': 'Rate limit exceeded',
'message': 'Too many requests. Please try again later.',
'retry_after': e.retry_after
}), 429
if __name__ == '__main__':
app.run(debug=True, port=5000)JAVASCRIPT
Implementação com JavaScript
JavaScript oferece flexibilidade para implementar integrações tanto no frontend quanto no backend (Node.js). Esta seção abrange desde requisições básicas até implementações avançadas com React e Express.js.
Cliente JavaScript para Frontend
EXPLICAÇÃO DO CÓDIGO
Classe JavaScript para gerenciar requisições à OpenAI API com retry automático, cache local e controle de rate limiting.
class OpenAIClient {
constructor(apiKey, baseURL = 'https://api.openai.com/v1') {
this.apiKey = apiKey;
this.baseURL = baseURL;
this.cache = new Map();
this.rateLimitInfo = {
remainingRequests: 1000,
resetTime: null
};
}
async makeRequest(endpoint, data, options = {}) {
const {
method = 'POST',
timeout = 30000,
retries = 3,
useCache = true
} = options;
// Verifica rate limit
if (this.rateLimitInfo.remainingRequests <= 0) {
const waitTime = this.rateLimitInfo.resetTime - Date.now();
if (waitTime > 0) {
throw new Error(`Rate limit exceeded. Reset in ${Math.ceil(waitTime / 1000)} seconds`);
}
}
// Gera chave de cache
const cacheKey = useCache ?
`${endpoint}:${JSON.stringify(data)}` : null;
// Verifica cache
if (cacheKey && this.cache.has(cacheKey)) {
const cached = this.cache.get(cacheKey);
if (Date.now() - cached.timestamp < 300000) { // 5 minutos
return { ...cached.data, cached: true };
}
}
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
try {
const response = await this.fetchWithRetry(
`${this.baseURL}${endpoint}`,
{
method,
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(data),
signal: controller.signal
},
retries
);
// Atualiza informações de rate limit
this.updateRateLimitInfo(response.headers);
const result = await response.json();
// Armazena em cache
if (cacheKey && response.ok) {
this.cache.set(cacheKey, {
data: result,
timestamp: Date.now()
});
// Limita tamanho do cache
if (this.cache.size > 100) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}
}
if (!response.ok) {
throw new Error(`API Error: ${result.error?.message || response.statusText}`);
}
return result;
} catch (error) {
if (error.name === 'AbortError') {
throw new Error('Request timeout');
}
throw error;
} finally {
clearTimeout(timeoutId);
}
}
async fetchWithRetry(url, options, retries) {
for (let i = 0; i <= retries; i++) {
try {
const response = await fetch(url, options);
// Não tenta novamente para erros 4xx (exceto 429)
if (response.status >= 400 && response.status < 500 && response.status !== 429) {
return response;
}
// Para 429 (rate limit), espera o tempo sugerido
if (response.status === 429) {
const retryAfter = response.headers.get('retry-after') || '1';
await this.sleep(parseInt(retryAfter) * 1000);
continue;
}
if (response.ok) {
return response;
}
// Para outros erros 5xx, usa backoff exponencial
if (i < retries) {
await this.sleep(Math.pow(2, i) * 1000);
}
} catch (error) {
if (i === retries) throw error;
await this.sleep(Math.pow(2, i) * 1000);
}
}
}
updateRateLimitInfo(headers) {
const remaining = headers.get('x-ratelimit-remaining-requests');
const resetTime = headers.get('x-ratelimit-reset-requests');
if (remaining) {
this.rateLimitInfo.remainingRequests = parseInt(remaining);
}
if (resetTime) {
this.rateLimitInfo.resetTime = Date.now() + (parseFloat(resetTime) * 1000);
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async createChatCompletion(messages, options = {}) {
const data = {
model: options.model || 'gpt-3.5-turbo',
messages: messages,
temperature: options.temperature || 0.7,
max_tokens: options.maxTokens || 500,
presence_penalty: options.presencePenalty || 0,
frequency_penalty: options.frequencyPenalty || 0,
stream: options.stream || false
};
return this.makeRequest('/chat/completions', data, {
useCache: options.useCache !== false
});
}
async createCompletion(prompt, options = {}) {
const data = {
model: options.model || 'text-davinci-003',
prompt: prompt,
temperature: options.temperature || 0.7,
max_tokens: options.maxTokens || 500
};
return this.makeRequest('/completions', data);
}
getRateLimitStatus() {
return {
remainingRequests: this.rateLimitInfo.remainingRequests,
resetTime: this.rateLimitInfo.resetTime,
cacheSize: this.cache.size
};
}
clearCache() {
this.cache.clear();
}
}
// Exemplo de uso
const openai = new OpenAIClient('your-api-key-here');
// Função auxiliar para chat simples
async function simpleChat(message, options = {}) {
try {
const response = await openai.createChatCompletion([
{ role: 'user', content: message }
], options);
return {
message: response.choices[0].message.content,
usage: response.usage,
model: response.model,
cached: response.cached || false
};
} catch (error) {
console.error('Chat error:', error);
return {
error: error.message,
message: 'Desculpe, ocorreu um erro ao processar sua mensagem.'
};
}
}
// Exemplo de uso
simpleChat('Explique quantum computing em termos simples', {
model: 'gpt-4',
temperature: 0.3,
maxTokens: 200
}).then(result => {
console.log('Resposta:', result.message);
console.log('Tokens usados:', result.usage?.total_tokens);
console.log('Foi cache:', result.cached);
});Interface React com ChatGPT
Vamos criar uma interface React completa que demonstra como integrar o ChatGPT em uma aplicação moderna, com funcionalidades como streaming de respostas, histórico persistente e múltiplas conversas.
EXPLICAÇÃO DO CÓDIGO
Componente React completo para chat com ChatGPT, incluindo estado de conversa, streaming de respostas e persistência local.
import React, { useState, useEffect, useRef } from 'react';
import './ChatInterface.css';
const ChatInterface = ({ apiKey }) => {
const [messages, setMessages] = useState([]);
const [inputMessage, setInputMessage] = useState('');
const [isLoading, setIsLoading] = useState(false);
const [streamingMessage, setStreamingMessage] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const [settings, setSettings] = useState({
model: 'gpt-3.5-turbo',
temperature: 0.7,
maxTokens: 500
});
const [totalTokens, setTotalTokens] = useState(0);
const [estimatedCost, setEstimatedCost] = useState(0);
const messagesEndRef = useRef(null);
const openAIClientRef = useRef(null);
// Inicializa cliente OpenAI
useEffect(() => {
if (apiKey) {
openAIClientRef.current = new OpenAIClient(apiKey);
}
}, [apiKey]);
// Carrega mensagens do localStorage
useEffect(() => {
const savedMessages = localStorage.getItem('chatMessages');
if (savedMessages) {
setMessages(JSON.parse(savedMessages));
}
const savedStats = localStorage.getItem('chatStats');
if (savedStats) {
const stats = JSON.parse(savedStats);
setTotalTokens(stats.totalTokens || 0);
setEstimatedCost(stats.estimatedCost || 0);
}
}, []);
// Salva mensagens no localStorage
useEffect(() => {
if (messages.length > 0) {
localStorage.setItem('chatMessages', JSON.stringify(messages));
}
}, [messages]);
// Auto-scroll para últimas mensagens
useEffect(() => {
scrollToBottom();
}, [messages, streamingMessage]);
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
const updateStats = (tokensUsed, cost) => {
const newTotalTokens = totalTokens + tokensUsed;
const newTotalCost = estimatedCost + cost;
setTotalTokens(newTotalTokens);
setEstimatedCost(newTotalCost);
localStorage.setItem('chatStats', JSON.stringify({
totalTokens: newTotalTokens,
estimatedCost: newTotalCost
}));
};
const sendMessage = async () => {
if (!inputMessage.trim() || isLoading || !openAIClientRef.current) return;
const userMessage = {
id: Date.now(),
role: 'user',
content: inputMessage.trim(),
timestamp: new Date().toISOString()
};
setMessages(prev => [...prev, userMessage]);
setInputMessage('');
setIsLoading(true);
try {
// Prepara contexto da conversa
const conversationHistory = [
...messages,
userMessage
].map(msg => ({
role: msg.role,
content: msg.content
}));
// Limita histórico para evitar excesso de tokens
const limitedHistory = conversationHistory.slice(-10);
if (settings.streaming) {
await handleStreamingResponse(limitedHistory);
} else {
await handleRegularResponse(limitedHistory);
}
} catch (error) {
console.error('Erro ao enviar mensagem:', error);
const errorMessage = {
id: Date.now(),
role: 'assistant',
content: `Erro: ${error.message}`,
timestamp: new Date().toISOString(),
isError: true
};
setMessages(prev => [...prev, errorMessage]);
} finally {
setIsLoading(false);
}
};
const handleRegularResponse = async (conversationHistory) => {
const response = await openAIClientRef.current.createChatCompletion(
conversationHistory,
settings
);
const assistantMessage = {
id: Date.now(),
role: 'assistant',
content: response.choices[0].message.content,
timestamp: new Date().toISOString(),
metadata: {
model: response.model,
tokensUsed: response.usage.total_tokens,
cached: response.cached
}
};
setMessages(prev => [...prev, assistantMessage]);
if (response.usage) {
const cost = calculateCost(response.usage.total_tokens, response.model);
updateStats(response.usage.total_tokens, cost);
}
};
const handleStreamingResponse = async (conversationHistory) => {
setIsStreaming(true);
setStreamingMessage('');
// Aqui você implementaria o streaming usando Server-Sent Events
// ou WebSockets conforme sua arquitetura backend
// Simulação para demonstração
const response = await openAIClientRef.current.createChatCompletion(
conversationHistory,
{ ...settings, stream: false }
);
const content = response.choices[0].message.content;
// Simula streaming palavra por palavra
const words = content.split(' ');
let currentContent = '';
for (let i = 0; i < words.length; i++) {
currentContent += (i > 0 ? ' ' : '') + words[i];
setStreamingMessage(currentContent);
await new Promise(resolve => setTimeout(resolve, 50));
}
const assistantMessage = {
id: Date.now(),
role: 'assistant',
content: content,
timestamp: new Date().toISOString(),
metadata: {
model: response.model,
tokensUsed: response.usage.total_tokens
}
};
setMessages(prev => [...prev, assistantMessage]);
setStreamingMessage('');
setIsStreaming(false);
if (response.usage) {
const cost = calculateCost(response.usage.total_tokens, response.model);
updateStats(response.usage.total_tokens, cost);
}
};
const calculateCost = (tokens, model) => {
const rates = {
'gpt-3.5-turbo': 0.0015 / 1000,
'gpt-4': 0.03 / 1000,
'gpt-4-turbo': 0.01 / 1000
};
return tokens * (rates[model] || 0.002);
};
const clearConversation = () => {
setMessages([]);
setStreamingMessage('');
localStorage.removeItem('chatMessages');
};
const handleKeyPress = (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
};
return (
<div className="chat-interface">
{/* Header com configurações */}
<div className="chat-header">
<h2>ChatGPT Interface</h2>
<div className="chat-stats">
<span>Tokens: {totalTokens.toLocaleString()}</span>
<span>Custo: ${estimatedCost.toFixed(4)}</span>
</div>
</div>
{/* Área de configurações */}
<div className="chat-settings">
<select
value={settings.model}
onChange={(e) => setSettings({...settings, model: e.target.value})}
>
<option value="gpt-3.5-turbo">GPT-3.5 Turbo</option>
<option value="gpt-4">GPT-4</option>
<option value="gpt-4-turbo">GPT-4 Turbo</option>
</select>
<label>
Temperature: {settings.temperature}
<input
type="range"
min="0"
max="2"
step="0.1"
value={settings.temperature}
onChange={(e) => setSettings({...settings, temperature: parseFloat(e.target.value)})}
/>
</label>
<button onClick={clearConversation}>
Limpar Conversa
</button>
</div>
{/* Área de mensagens */}
<div className="messages-container">
{messages.map((message) => (
<div key={message.id} className={`message ${message.role}`}>
<div className="message-content">
{message.content}
</div>
<div className="message-meta">
{message.timestamp && new Date(message.timestamp).toLocaleTimeString()}
{message.metadata && (
<span> • {message.metadata.tokensUsed} tokens • {message.metadata.model}</span>
)}
</div>
</div>
))}
{isStreaming && streamingMessage && (
<div className="message assistant streaming">
<div className="message-content">
{streamingMessage}
<span className="cursor">|</span>
</div>
</div>
)}
{isLoading && !isStreaming && (
<div className="message assistant loading">
<div className="message-content">
Pensando...
</div>
</div>
)}
<div ref={messagesEndRef} />
</div>
{/* Área de input */}
<div className="input-area">
<textarea
value={inputMessage}
onChange={(e) => setInputMessage(e.target.value)}
onKeyPress={handleKeyPress}
placeholder="Digite sua mensagem... (Enter para enviar, Shift+Enter para nova linha)"
disabled={isLoading}
rows={3}
/>
<button
onClick={sendMessage}
disabled={isLoading || !inputMessage.trim()}
className="send-button"
>
{isLoading ? '...' : 'Enviar'}
</button>
</div>
</div>
);
};
export default ChatInterface;
TRATAMENTO DE ERROS
Tratamento de Erros e Rate Limiting
A OpenAI API possui limitações rigorosas de rate limiting e pode retornar diferentes tipos de erros. Um tratamento adequado garante experiência do usuário consistente e evita custos desnecessários. A API implementa limites de 3 RPM (requests per minute) para contas gratuitas e até 3,500 RPM para contas pagas.
Tipos de Erro Comuns
PROBLEMA 01
Rate Limiting (HTTP 429)
Ocorre quando você excede o limite de requisições por minuto. A resposta inclui header retry-after indicando quando tentar novamente.
SOLUÇÃO — Implementar backoff exponencial com retry
class RateLimitHandler {
constructor() {
this.requestCounts = new Map();
this.resetTimes = new Map();
}
async handleRequest(apiKey, requestFn, maxRetries = 3) {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await requestFn();
} catch (error) {
if (error.response?.status === 429) {
const retryAfter = error.response.headers['retry-after'];
const waitTime = retryAfter ?
parseInt(retryAfter) * 1000 :
Math.pow(2, attempt) * 1000;
console.log(`Rate limited. Waiting ${waitTime}ms...`);
await this.sleep(waitTime);
continue;
}
throw error;
}
}
throw new Error('Max retries exceeded');
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}PROBLEMA 02
Limite de Tokens Excedido (HTTP 400)
Ocorre quando a requisição + resposta excedem o limite máximo de tokens do modelo (4K para GPT-3.5, 128K para GPT-4 Turbo).
SOLUÇÃO — Gerenciamento inteligente de tokens
function estimateTokens(text) {
// Estimativa aproximada: 1 token ≈ 4 caracteres para inglês
// Para português, usar fator de 3.5
return Math.ceil(text.length / 3.5);
}
function truncateConversation(messages, maxTokens = 3000) {
const systemMessage = messages.find(m => m.role === 'system');
let userMessages = messages.filter(m => m.role !== 'system');
let totalTokens = systemMessage ?
estimateTokens(systemMessage.content) : 0;
const result = systemMessage ? [systemMessage] : [];
// Adiciona mensagens mais recentes primeiro
for (let i = userMessages.length - 1; i >= 0; i--) {
const messageTokens = estimateTokens(userMessages[i].content);
if (totalTokens + messageTokens > maxTokens) {
break;
}
result.unshift(userMessages[i]);
totalTokens += messageTokens;
}
return result;
}PONTO-CHAVE
A OpenAI cobra tanto por tokens de entrada quanto de saída. Um prompt de 1K tokens que gera resposta de 500 tokens custará pelo total de 1.5K tokens.
Sistema de Rate Limiting Avançado
EXPLICAÇÃO DO CÓDIGO
Sistema completo de rate limiting com sliding window, priorização de requisições e monitoramento em tempo real.
class AdvancedRateLimiter {
constructor(options = {}) {
this.limits = {
requestsPerMinute: options.rpm || 60,
tokensPerMinute: options.tpm || 90000,
requestsPerDay: options.rpd || 10000
};
this.windows = {
minute: [],
day: []
};
this.queue = [];
this.processing = false;
this.metrics = {
totalRequests: 0,
totalTokens: 0,
rejectedRequests: 0,
averageWaitTime: 0
};
}
async addRequest(requestFn, priority = 'normal', estimatedTokens = 1000) {
return new Promise((resolve, reject) => {
const request = {
id: Date.now() + Math.random(),
fn: requestFn,
priority: priority,
estimatedTokens: estimatedTokens,
timestamp: Date.now(),
resolve: resolve,
reject: reject
};
// Insere na posição correta baseado na prioridade
const priorities = { high: 0, normal: 1, low: 2 };
const insertIndex = this.queue.findIndex(r =>
priorities[r.priority] > priorities[priority]
);
if (insertIndex === -1) {
this.queue.push(request);
} else {
this.queue.splice(insertIndex, 0, request);
}
this.processQueue();
});
}
async processQueue() {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
while (this.queue.length > 0) {
const now = Date.now();
this.cleanWindows(now);
const nextRequest = this.queue[0];
// Verifica se pode processar a requisição
if (this.canMakeRequest(nextRequest.estimatedTokens, now)) {
this.queue.shift();
try {
const startTime = Date.now();
const result = await nextRequest.fn();
const endTime = Date.now();
// Registra métricas
this.recordRequest(
nextRequest.estimatedTokens,
now,
endTime - startTime
);
nextRequest.resolve(result);
} catch (error) {
this.metrics.rejectedRequests++;
nextRequest.reject(error);
}
} else {
// Calcula tempo de espera
const waitTime = this.calculateWaitTime(nextRequest.estimatedTokens, now);
if (waitTime > 300000) { // 5 minutos
// Rejeita requisições com espera muito longa
const request = this.queue.shift();
this.metrics.rejectedRequests++;
request.reject(new Error('Request timeout: wait time too long'));
continue;
}
console.log(`Rate limited. Waiting ${waitTime}ms for next request...`);
await this.sleep(Math.min(waitTime, 60000)); // Max 1 minuto de espera
}
}
this.processing = false;
}
canMakeRequest(estimatedTokens, now) {
// Verifica limite de requisições por minuto
const minuteRequests = this.windows.minute.length;
if (minuteRequests >= this.limits.requestsPerMinute) {
return false;
}
// Verifica limite de tokens por minuto
const minuteTokens = this.windows.minute.reduce((sum, req) =>
sum + req.tokens, 0);
if (minuteTokens + estimatedTokens > this.limits.tokensPerMinute) {
return false;
}
// Verifica limite de requisições por dia
const dayRequests = this.windows.day.length;
if (dayRequests >= this.limits.requestsPerDay) {
return false;
}
return true;
}
calculateWaitTime(estimatedTokens, now) {
const minuteWindow = 60 * 1000;
const oldestMinuteRequest = this.windows.minute[0];
if (oldestMinuteRequest) {
const timeSinceOldest = now - oldestMinuteRequest.timestamp;
return Math.max(0, minuteWindow - timeSinceOldest);
}
return 0;
}
cleanWindows(now) {
const minuteAgo = now - (60 * 1000);
const dayAgo = now - (24 * 60 * 60 * 1000);
this.windows.minute = this.windows.minute.filter(
req => req.timestamp > minuteAgo
);
this.windows.day = this.windows.day.filter(
req => req.timestamp > dayAgo
);
}
recordRequest(tokens, timestamp, responseTime) {
const record = { timestamp, tokens, responseTime };
this.windows.minute.push(record);
this.windows.day.push(record);
this.metrics.totalRequests++;
this.metrics.totalTokens += tokens;
// Atualiza tempo médio de resposta
this.metrics.averageWaitTime =
(this.metrics.averageWaitTime * (this.metrics.totalRequests - 1) + responseTime) /
this.metrics.totalRequests;
}
getStatus() {
const now = Date.now();
this.cleanWindows(now);
return {
queue: {
length: this.queue.length,
processing: this.processing,
nextPriority: this.queue[0]?.priority
},
limits: this.limits,
current: {
requestsThisMinute: this.windows.minute.length,
tokensThisMinute: this.windows.minute.reduce((sum, req) =>
sum + req.tokens, 0),
requestsToday: this.windows.day.length
},
metrics: this.metrics
};
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Exemplo de uso
const rateLimiter = new AdvancedRateLimiter({
rpm: 100, // 100 requests per minute
tpm: 150000, // 150K tokens per minute
rpd: 5000 // 5K requests per day
});
// Função para fazer requisição com rate limiting
async function makeOpenAIRequest(messages, priority = 'normal') {
const estimatedTokens = messages.reduce((sum, msg) =>
sum + estimateTokens(msg.content), 0) + 500; // +500 para resposta estimada
return rateLimiter.addRequest(async () => {
return await client.chat.completions.create({
model: "gpt-3.5-turbo",
messages: messages,
max_tokens: 500
});
}, priority, estimatedTokens);
}
// Monitoramento do rate limiter
setInterval(() => {
const status = rateLimiter.getStatus();
console.log('Rate Limiter Status:', JSON.stringify(status, null, 2));
}, 30000); // Log a cada 30 segundosSEGURANÇA
Segurança e Melhores Práticas
A segurança é fundamental ao integrar APIs de IA em aplicações de produção. Dados sensíveis podem ser expostos através de prompts mal construídos, e custos podem escalar rapidamente sem controles adequados. Em 2025, ataques de prompt injection aumentaram 280%, tornando a validação rigorosa essencial.

Proteção de API Keys
AVISO
Nunca exponha API keys no frontend. Uma chave vazada pode gerar milhares de dólares em custos em poucas horas. Sempre use proxy no backend.
Lista de verificação de Segurança
☑ API keys armazenadas em variáveis de ambiente
☑ Proxy backend para chamadas da API
☑ Rate limiting por usuário implementado
☑ Validação e sanitização de input
☑ Logs de auditoria configurados
☑ Limite máximo de tokens por requisição
☑ Timeout configurado para requisições
Middleware de Segurança
EXPLICAÇÃO DO CÓDIGO
Middleware completo para Flask que implementa autenticação, validação de input, rate limiting por usuário e logging de auditoria.
import jwt
import re
import bleach
from datetime import datetime, timedelta
from functools import wraps
import logging
# Configuração de logging
logging.basicConfig(level=logging.INFO)
audit_logger = logging.getLogger('audit')
class SecurityMiddleware:
def __init__(self, app, redis_client, config):
self.app = app
self.redis = redis_client
self.config = config
self.setup_middleware()
def setup_middleware(self):
self.app.before_request_funcs.setdefault(None, []).append(
self.security_check
)
def security_check(self):
# Log da requisição
self.log_request()
# Verifica rate limiting
if not self.check_rate_limit():
return jsonify({'error': 'Rate limit exceeded'}), 429
# Valida token de autenticação
if not self.validate_auth_token():
return jsonify({'error': 'Invalid authentication'}), 401
def secure_openai_endpoint(max_tokens=1000, require_auth=True):
"""Decorator para endpoints da OpenAI com segurança"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
# Extrai dados da requisição
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Validação básica
validation_errors = validate_openai_request(data)
if validation_errors:
audit_logger.warning(f"Validation failed: {validation_errors}")
return jsonify({'errors': validation_errors}), 400
# Sanitiza input
data = sanitize_request_data(data)
# Verifica limite de tokens
estimated_tokens = estimate_total_tokens(data)
if estimated_tokens > max_tokens:
return jsonify({
'error': f'Request too large: {estimated_tokens} tokens (max: {max_tokens})'
}), 400
# Detecta tentativas de prompt injection
if detect_prompt_injection(data.get('message', '')):
audit_logger.warning(f"Prompt injection detected from {request.remote_addr}")
return jsonify({'error': 'Invalid request content'}), 400
# Executa função original com dados validados
request.validated_data = data
return f(*args, **kwargs)
except Exception as e:
audit_logger.error(f"Security middleware error: {str(e)}")
return jsonify({'error': 'Request processing failed'}), 500
return decorated_function
return decorator
def validate_openai_request(data):
"""Valida estrutura da requisição OpenAI"""
errors = []
# Verifica campos obrigatórios
if 'message' not in data:
errors.append('Message is required')
# Valida tamanho da mensagem
if 'message' in data:
message = data['message']
if not isinstance(message, str):
errors.append('Message must be a string')
elif len(message.strip()) == 0:
errors.append('Message cannot be empty')
elif len(message) > 10000: # 10k characters limit
errors.append('Message too long (max 10,000 characters)')
# Valida parâmetros opcionais
if 'temperature' in data:
temp = data['temperature']
if not isinstance(temp, (int, float)) or temp < 0 or temp > 2:
errors.append('Temperature must be between 0 and 2')
if 'max_tokens' in data:
max_tokens = data['max_tokens']
if not isinstance(max_tokens, int) or max_tokens < 1 or max_tokens > 4000:
errors.append('max_tokens must be between 1 and 4000')
if 'model' in data:
valid_models = ['gpt-3.5-turbo', 'gpt-4', 'gpt-4-turbo']
if data['model'] not in valid_models:
errors.append(f'Invalid model. Must be one of: {valid_models}')
return errors
def sanitize_request_data(data):
"""Sanitiza dados de entrada"""
sanitized = data.copy()
# Sanitiza mensagem principal
if 'message' in sanitized:
message = sanitized['message']
# Remove HTML tags
message = bleach.clean(message, strip=True)
# Remove caracteres de controle
message = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', message)
# Normaliza espaços em branco
message = ' '.join(message.split())
sanitized['message'] = message
# Sanitiza histórico de conversa se presente
if 'conversation_history' in sanitized:
history = sanitized['conversation_history']
if isinstance(history, list):
for msg in history:
if isinstance(msg, dict) and 'content' in msg:
msg['content'] = bleach.clean(msg['content'], strip=True)
return sanitized
def detect_prompt_injection(message):
"""Detecta tentativas de prompt injection"""
message_lower = message.lower()
# Padrões suspeitos
injection_patterns = [
r'ignore\s+previous\s+instructions',
r'forget\s+everything',
r'system\s*:\s*you\s+are',
r'new\s+instructions\s*:',
r'override\s+safety',
r'jailbreak',
r'roleplay\s+as',
r'pretend\s+to\s+be',
r'\[SYSTEM\]|\[USER\]|\[ASSISTANT\]',
r'<script>|</script>',
r'javascript:|data:',
]
for pattern in injection_patterns:
if re.search(pattern, message_lower):
return True
# Verifica tentativas de manipulação de tokens
special_tokens = ['<|endoftext|>', '<|startoftext|>', '<|im_start|>', '<|im_end|>']
for token in special_tokens:
if token in message:
return True
# Verifica repetições excessivas (possível ataque de spam)
words = message.split()
if len(words) > 10:
unique_words = set(words)
if len(unique_words) / len(words) < 0.3: # Menos de 30% palavras únicas
return True
return False
def estimate_total_tokens(data):
"""Estima total de tokens da requisição"""
message = data.get('message', '')
max_tokens = data.get('max_tokens', 500)
# Estima tokens da mensagem de entrada
input_tokens = len(message) / 3.5 # Aproximação para português
# Adiciona tokens do histórico se presente
if 'conversation_history' in data:
for msg in data['conversation_history']:
if isinstance(msg, dict) and 'content' in msg:
input_tokens += len(msg['content']) / 3.5
# Soma tokens estimados de resposta
return int(input_tokens + max_tokens)
class UserRateLimiter:
"""Rate limiter por usuário"""
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(self, user_id, endpoint, limits):
"""Verifica se usuário pode fazer requisição"""
current_time = datetime.now()
for period, max_requests in limits.items():
key = f"rate_limit:{user_id}:{endpoint}:{period}"
# Busca contador atual
current_count = self.redis.get(key)
current_count = int(current_count) if current_count else 0
if current_count >= max_requests:
return False, f"Rate limit exceeded for {period}"
# Incrementa contador
pipe = self.redis.pipeline()
pipe.incr(key)
# Define expiração baseada no período
if period == 'minute':
pipe.expire(key, 60)
elif period == 'hour':
pipe.expire(key, 3600)
elif period == 'day':
pipe.expire(key, 86400)
pipe.execute()
return True, None
def get_usage(self, user_id, endpoint):
"""Retorna uso atual do usuário"""
usage = {}
for period in ['minute', 'hour', 'day']:
key = f"rate_limit:{user_id}:{endpoint}:{period}"
count = self.redis.get(key)
usage[period] = int(count) if count else 0
return usage
# Configuração e uso
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
rate_limiter = UserRateLimiter(redis_client)
@app.route('/api/secure-chat', methods=['POST'])
@secure_openai_endpoint(max_tokens=1000, require_auth=True)
def secure_chat():
"""Endpoint seguro para chat"""
user_id = g.current_user['id']
# Verifica rate limiting por usuário
limits = {
'minute': 10, # 10 requests por minuto
'hour': 100, # 100 requests por hora
'day': 500 # 500 requests por dia
}
allowed, error = rate_limiter.is_allowed(user_id, 'chat', limits)
if not allowed:
return jsonify({'error': error}), 429
# Processa requisição validada
data = request.validated_data
try:
# Faz chamada para OpenAI
bot = ChatBot(model=data.get('model', 'gpt-3.5-turbo'))
response = bot.send_message(
data['message'],
temperature=data.get('temperature', 0.7)
)
# Log de auditoria
audit_logger.info(f"Successful chat request from user {user_id}")
return jsonify(response)
except Exception as e:
audit_logger.error(f"Chat error for user {user_id}: {str(e)}")
return jsonify({'error': 'Chat processing failed'}), 500PONTO-CHAVE
Implemente monitoramento de custos em tempo real. Configure alertas quando gastos diários excedem $50 ou quando um usuário consome mais de 10K tokens por hora.
APLICAÇÕES PRÁTICAS
Casos de Uso Práticos
A versatilidade da OpenAI API permite implementar soluções em diversos domínios. Esta seção apresenta casos reais com implementações completas, demonstrando como adaptar a integração para necessidades específicas.
Sistema de Atendimento ao Cliente
Um sistema de suporte que integra ChatGPT pode reduzir tickets de primeiro nível em até 60%, conforme dados de empresas que implementaram a solução em 2025.
E-commerce – Suporte Técnico
Resolução automática de dúvidas sobre produtos, pedidos e devoluções com escalação inteligente para humanos.
SaaS – Onboarding Inteligente
Assistente que guia novos usuários através de funcionalidades complexas, reduzindo tempo de adoção em 40%.
Educação – Tutoria Personalizada
Tutor virtual que adapta explicações ao nível do aluno e identifica lacunas de conhecimento.

EXPLICAÇÃO DO CÓDIGO
Sistema de suporte que classifica tickets, busca em base de conhecimento e escalona para humanos quando necessário.
class IntelligentSupport:
def __init__(self, openai_client, knowledge_base):
self.client = openai_client
self.knowledge_base = knowledge_base
self.escalation_triggers = [
'quero falar com humano',
'não está funcionando',
'problema urgente',
'reclamação',
'cancelar conta',
'reembolso'
]
async def process_support_request(self, user_message, user_context):
"""Processa requisição de suporte"""
# Classifica o tipo de solicitação
classification = await self.classify_request(user_message)
# Busca informações relevantes na base de conhecimento
relevant_info = await self.search_knowledge_base(
user_message,
classification['category']
)
# Verifica se deve escalar para humano
should_escalate = self.should_escalate_to_human(
user_message,
classification,
user_context
)
if should_escalate:
return await self.escalate_to_human(
user_message,
classification,
user_context
)
# Gera resposta automatizada
response = await self.generate_support_response(
user_message,
classification,
relevant_info,
user_context
)
return response
async def classify_request(self, message):
"""Classifica tipo de solicitação de suporte"""
classification_prompt = f"""
Classifique esta solicitação de suporte em uma das categorias:
Categorias: technical, billing, general, complaint, compliment, other
Urgência: low, medium, high, urgent
Sentimento: positive, neutral, negative, angry
Mensagem: "{message}"
Retorne apenas um JSON com: category, urgency, sentiment, confidence
"""
response = await self.client.createChatCompletion([
{"role": "system", "content": "Você é um classificador de tickets de suporte."},
{"role": "user", "content": classification_prompt}
], {
"temperature": 0.1,
"maxTokens": 150
})
try:
return json.loads(response.message)
except:
return {
"category": "general",
"urgency": "medium",
"sentiment": "neutral",
"confidence": 0.5
}
async def search_knowledge_base(self, query, category):
"""Busca informações relevantes na base de conhecimento"""
# Implementar busca semântica ou tradicional
# Este é um exemplo simplificado
search_results = self.knowledge_base.search(
query=query,
category=category,
limit=3
)
return search_results
def should_escalate_to_human(self, message, classification, user_context):
"""Determina se deve escalar para atendente humano"""
# Verifica palavras-chave de escalação
message_lower = message.lower()
for trigger in self.escalation_triggers:
if trigger in message_lower:
return True
# Escalação baseada em urgência
if classification['urgency'] == 'urgent':
return True
# Escalação baseada em sentimento
if classification['sentiment'] == 'angry':
return True
# Escalação baseada em contexto do usuário
if user_context.get('is_premium_customer', False):
if classification['urgency'] in ['high', 'urgent']:
return True
# Escalação baseada em confiança da classificação
if classification['confidence'] < 0.6:
return True
# Escalação baseada em histórico
recent_tickets = user_context.get('recent_tickets', 0)
if recent_tickets > 3: # Muitos tickets recentes
return True
return False
async def generate_support_response(self, message, classification, knowledge, user_context):
"""Gera resposta de suporte personalizada"""
# Constrói contexto para o prompt
context_info = []
if knowledge:
context_info.append("Informações relevantes da base de conhecimento:")
for item in knowledge:
context_info.append(f"- {item['title']}: {item['content'][:200]}...")
user_info = f"""
Informações do usuário:
- Tipo de conta: {user_context.get('account_type', 'básica')}
- Tempo como cliente: {user_context.get('customer_since', 'novo')}
- Tickets recentes: {user_context.get('recent_tickets', 0)}
"""
support_prompt = f"""
Você é um assistente de suporte técnico amigável e profissional.
Solicitação do usuário: "{message}"
Categoria: {classification['category']}
Urgência: {classification['urgency']}
{chr(10).join(context_info)}
{user_info}
Diretrizes:
1. Seja empático e profissional
2. Forneça soluções específicas quando possível
3. Use informações da base de conhecimento
4. Se não souber, admita e ofereça alternativas
5. Inclua próximos passos claros
6. Mantenha tom adequado à urgência
Responda de forma útil e personalizada:
"""
response = await self.client.createChatCompletion([
{"role": "system", "content": "Você é um especialista em suporte ao cliente."},
{"role": "user", "content": support_prompt}
], {
"temperature": 0.7,
"maxTokens": 400
})
return {
"message": response.message,
"classification": classification,
"escalated": False,
"knowledge_used": len(knowledge) if knowledge else 0,
"response_type": "automated"
}
async def escalate_to_human(self, message, classification, user_context):
"""Escala ticket para atendente humano"""
# Cria ticket estruturado
ticket_data = {
"user_message": message,
"classification": classification,
"user_context": user_context,
"escalation_reason": self.get_escalation_reason(message, classification),
"priority": self.calculate_priority(classification, user_context),
"suggested_agent": self.suggest_agent(classification),
"created_at": datetime.now().isoformat()
}
# Salva ticket no sistema
ticket_id = await self.create_human_ticket(ticket_data)
# Gera resposta de escalação
escalation_message = f"""
Entendo sua solicitação e vou conectá-lo com um de nossos especialistas.
Seu ticket #{ticket_id} foi criado com prioridade {ticket_data['priority']}.
Um atendente especializado em {classification['category']} entrará em contato
em até {self.get_sla_time(ticket_data['priority'])}.
Enquanto isso, você pode acompanhar o status em nossa central de ajuda.
Há algo mais em que posso ajudar no momento?
"""
return {
"message": escalation_message,
"escalated": True,
"ticket_id": ticket_id,
"estimated_response_time": self.get_sla_time(ticket_data['priority']),
"response_type": "escalated"
}
def get_escalation_reason(self, message, classification):
"""Determina motivo da escalação"""
if classification['sentiment'] == 'angry':
return "customer_frustration"
elif classification['urgency'] == 'urgent':
return "high_urgency"
elif 'humano' in message.lower():
return "human_requested"
else:
return "complex_issue"
def calculate_priority(self, classification, user_context):
"""Calcula prioridade do ticket"""
base_priority = {
'low': 1, 'medium': 2, 'high': 3, 'urgent': 4
}[classification['urgency']]
# Ajusta por tipo de cliente
if user_context.get('is_premium_customer'):
base_priority += 1
# Ajusta por sentimento
if classification['sentiment'] == 'angry':
base_priority += 1
return min(base_priority, 5) # Máximo 5
def get_sla_time(self, priority):
"""Retorna tempo de SLA baseado na prioridade"""
sla_times = {
1: "24 horas",
2: "12 horas",
3: "6 horas",
4: "2 horas",
5: "1 hora"
}
return sla_times.get(priority, "24 horas")
# Exemplo de uso
support_system = IntelligentSupport(openai_client, knowledge_base)
# Simulação de requisição
user_context = {
"user_id": "user_123",
"account_type": "premium",
"customer_since": "2023-01-15",
"recent_tickets": 1,
"is_premium_customer": True
}
response = await support_system.process_support_request(
"Meu pagamento foi cobrado duas vezes e preciso de reembolso urgente!",
user_context
)Gerador de Conteúdo Marketing
Ferramentas de marketing que automatizam criação de conteúdo podem aumentar produtividade em 3x, mantendo consistência de marca e SEO otimizado.
EXPLICAÇÃO DO CÓDIGO
Sistema que gera conteúdo otimizado para diferentes canais, mantendo tom de marca e incluindo SEO automaticamente.
class ContentGenerator:
def __init__(self, openai_client, brand_config):
self.client = openai_client
self.brand = brand_config
self.content_templates = {
'blog_post': {
'min_words': 800,
'seo_focus': True,
'include_cta': True,
'tone': 'informativo'
},
'social_media': {
'max_chars': 280,
'hashtags': True,
'engaging': True,
'tone': 'casual'
},
'email_campaign': {
'subject_variations': 3,
'personalization': True,
'cta_optimization': True,
'tone': 'profissional'
}
}
async def generate_content(self, topic, content_type, target_audience, additional_params=None):
"""Gera conteúdo baseado nos parâmetros"""
template = self.content_templates.get(content_type)
if not template:
raise ValueError(f"Tipo de conteúdo não suportado: {content_type}")
# Pesquisa palavras-chave relacionadas
keywords = await self.research_keywords(topic, content_type)
# Gera conteúdo principal
content = await self.create_content(
topic, content_type, target_audience, keywords, template, additional_params
)
# Otimiza SEO se necessário
if template.get('seo_focus'):
content = await self.optimize_seo(content, keywords)
# Valida qualidade
quality_score = await self.assess_content_quality(content, template)
return {
"content": content,
"keywords": keywords,
"quality_score": quality_score,
"content_type": content_type,
"generated_at": datetime.now().isoformat()
}
async def research_keywords(self, topic, content_type):
"""Pesquisa palavras-chave relacionadas ao tópico"""
keyword_prompt = f"""
Para o tópico "{topic}" e tipo de conteúdo "{content_type}",
sugira 10 palavras-chave relevantes em português brasileiro.
Considere:
- Volume de busca potencial
- Relevância para {self.brand['industry']}
- Intenção de busca do usuário
- Long-tail keywords
Retorne apenas uma lista JSON com as palavras-chave:
"""
response = await self.client.createChatCompletion([
{"role": "system", "content": "Você é um especialista em SEO e marketing de conteúdo."},
{"role": "user", "content": keyword_prompt}
], {
"temperature": 0.3,
"maxTokens": 200
})
try:
keywords = json.loads(response.message)
return keywords[:10] # Limita a 10
except:
return [topic] # Fallback
async def create_content(self, topic, content_type, audience, keywords, template, params):
"""Cria o conteúdo principal"""
# Constrói prompt personalizado
brand_voice = f"""
Tom da marca {self.brand['name']}:
- Personalidade: {self.brand['personality']}
- Tom: {template['tone']}
- Valores: {', '.join(self.brand['values'])}
- Público-alvo: {audience}
"""
keywords_text = f"Palavras-chave para incluir: {', '.join(keywords[:5])}"
if content_type == 'blog_post':
content = await self.generate_blog_post(topic, brand_voice, keywords_text, template, params)
elif content_type == 'social_media':
content = await self.generate_social_post(topic, brand_voice, keywords_text, template, params)
elif content_type == 'email_campaign':
content = await self.generate_email_campaign(topic, brand_voice, keywords_text, template, params)
else:
content = await self.generate_generic_content(topic, brand_voice, keywords_text, template, params)
return content
async def generate_blog_post(self, topic, brand_voice, keywords, template, params):
"""Gera post de blog completo"""
structure_prompt = f"""
Crie um post de blog completo sobre "{topic}".
{brand_voice}
{keywords}
Estrutura obrigatória:
1. Título SEO-otimizado (H1)
2. Introdução engajante (2-3 parágrafos)
3. 3-5 seções principais (H2) com subseções (H3)
4. Conclusão com CTA
5. Meta descrição (máximo 160 caracteres)
Requisitos:
- Mínimo {template['min_words']} palavras
- Use palavras-chave naturalmente
- Tom {template['tone']}
- Inclua dados e exemplos práticos
- Adicione perguntas para engajamento
Retorne em formato JSON:
{{
"title": "título aqui",
"meta_description": "descrição aqui",
"introduction": "introdução aqui",
"sections": [
{{"heading": "título seção", "content": "conteúdo aqui"}},
],
"conclusion": "conclusão aqui",
"cta": "call-to-action aqui"
}}
"""
response = await self.client.createChatCompletion([
{"role": "system", "content": "Você é um redator especialista em marketing de conteúdo."},
{"role": "user", "content": structure_prompt}
], {
"temperature": 0.7,
"maxTokens": 2000
})
try:
return json.loads(response.message)
except:
# Fallback para texto simples
return {"content": response.message}
async def generate_social_post(self, topic, brand_voice, keywords, template, params):
"""Gera posts para redes sociais"""
platform = params.get('platform', 'instagram') if params else 'instagram'
social_prompt = f"""
Crie 3 variações de post para {platform} sobre "{topic}".
{brand_voice}
{keywords}
Requisitos por plataforma:
- Instagram: visual, emojis, até 2200 chars, 5-10 hashtags
- Twitter: conciso, até 280 chars, 2-3 hashtags
- LinkedIn: profissional, até 1300 chars, sem muitos hashtags
- Facebook: conversacional, até 500 chars, call-to-action
Para cada variação inclua:
- Texto principal
- Hashtags sugeridas
- Sugestão de visual/imagem
- Melhor horário para postar
Formato JSON:
{{
"variations": [
{{
"text": "texto do post",
"hashtags": ["hashtag1", "hashtag2"],
"image_suggestion": "descrição da imagem",
"best_time": "melhor horário"
}}
]
}}
"""
response = await self.client.createChatCompletion([
{"role": "system", "content": "Você é um social media manager especialista."},
{"role": "user", "content": social_prompt}
], {
"temperature": 0.8,
"maxTokens": 800
})
try:
return json.loads(response.message)
except:
return {"variations": [{"text": response.message}]}
async def optimize_seo(self, content, keywords):
"""Otimiza conteúdo para SEO"""
if isinstance(content, dict) and 'title' in content:
# Otimiza título
seo_prompt = f"""
Otimize este título para SEO mantendo o sentido:
Título atual: "{content['title']}"
Palavras-chave: {keywords[:3]}
Requisitos:
- Máximo 60 caracteres
- Inclua palavra-chave principal no início
- Seja atrativo para cliques
- Mantenha naturalidade
Retorne apenas o título otimizado:
"""
response = await self.client.createChatCompletion([
{"role": "system", "content": "Você é um especialista em SEO."},
{"role": "user", "content": seo_prompt}
], {
"temperature": 0.3,
"maxTokens": 100
})
content['seo_title'] = response.message.strip()
return content
async def assess_content_quality(self, content, template):
"""Avalia qualidade do conteúdo gerado"""
content_text = self.extract_text_from_content(content)
quality_prompt = f"""
Avalie a qualidade deste conteúdo em uma escala de 1-10:
{content_text[:1000]}...
Critérios de avaliação:
- Clareza e coerência (25%)
- Engajamento e interesse (25%)
- Relevância ao tópico (25%)
- Qualidade da escrita (25%)
Retorne apenas um número de 1 a 10:
"""
response = await self.client.createChatCompletion([
{"role": "system", "content": "Você é um avaliador de qualidade de conteúdo."},
{"role": "user", "content": quality_prompt}
], {
"temperature": 0.1,
"maxTokens": 50
})
try:
score = float(response.message.strip())
return max(1, min(10, score))
except:
return 7.0 # Score médio como fallback
def extract_text_from_content(self, content):
"""Extrai texto do conteúdo estruturado"""
if isinstance(content, dict):
text_parts = []
for key, value in content.items():
if isinstance(value, str):
text_parts.append(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
text_parts.extend(item.values())
return ' '.join(text_parts)
return str(content)
# Configuração da marca
brand_config = {
"name": "TechStart",
"industry": "Tecnologia",
"personality": "Inovadora, acessível, confiável",
"values": ["Inovação", "Transparência", "Excelência"],
"target_audience": "Empreendedores e startups"
}
# Uso do sistema
content_gen = ContentGenerator(openai_client, brand_config)
# Gera blog post
blog_result = await content_gen.generate_content(
topic="Como escolher tecnologias para sua startup",
content_type="blog_post",
target_audience="Empreendedores iniciantes",
additional_params={"focus_keywords": ["startup", "tecnologia", "empreendedorismo"]}
)
print(f"Qualidade do conteúdo: {blog_result['quality_score']}/10")87%
de eficiência
Aumento médio de produtividade em marketing de conteúdo com automação IA
Obrigado por ler!
Agora você tem as ferramentas necessárias para integrar ChatGPT e OpenAI API em suas aplicações web de forma segura e eficiente. Com as técnicas apresentadas, pode criar desde chatbots simples até sistemas complexos de IA.
Dúvidas? Deixe um comentário!