로봇,ROS,SLAM
OpenVLA Tutorial 2
- -
git clone 한 openvla 폴더에서
vla-scripts/deploy.py
파일에서 아래와 같이 코드를 바꿔준다.
"""
deploy.py
Provide a lightweight server/client implementation for deploying OpenVLA models (through the HF AutoClass API) over a
REST API. This script implements *just* the server, with specific dependencies and instructions below.
Note that for the *client*, usage just requires numpy/json-numpy, and requests; example usage below!
Dependencies:
=> Server (runs OpenVLA model on GPU): `pip install uvicorn fastapi json-numpy`
=> Client: `pip install requests json-numpy`
Client (Standalone) Usage (assuming a server running on 0.0.0.0:8000):
```
import requests
import json_numpy
json_numpy.patch()
import numpy as np
action = requests.post(
"http://0.0.0.0:8000/act",
json={"image": np.zeros((256, 256, 3), dtype=np.uint8), "instruction": "do something"}
).json()
Note that if your server is not accessible on the open web, you can use ngrok, or forward ports to your client via ssh:
=> `ssh -L 8000:localhost:8000 ssh USER@<SERVER_IP>`
"""
import os.path
# ruff: noqa: E402
import json_numpy
json_numpy.patch()
import json
import logging
import traceback
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Union
import draccus
import torch
import uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from PIL import Image
from transformers import AutoModelForVision2Seq, AutoProcessor, BitsAndBytesConfig
# === Utilities ===
SYSTEM_PROMPT = (
"A chat between a curious user and an artificial intelligence assistant. "
"The assistant gives helpful, detailed, and polite answers to the user's questions."
)
def get_openvla_prompt(instruction: str, openvla_path: Union[str, Path]) -> str:
if "v01" in openvla_path:
return f"{SYSTEM_PROMPT} USER: What action should the robot take to {instruction.lower()}? ASSISTANT:"
else:
return f"In: What action should the robot take to {instruction.lower()}?\nOut:"
# === Server Interface ===
class OpenVLAServer:
def __init__(self, openvla_path: Union[str, Path], attn_implementation: Optional[str] = "flash_attention_2") -> Path:
"""
A simple server for OpenVLA models; exposes `/act` to predict an action for a given image + instruction.
=> Takes in {"image": np.ndarray, "instruction": str, "unnorm_key": Optional[str]}
=> Returns {"action": np.ndarray}
"""
self.openvla_path, self.attn_implementation = openvla_path, attn_implementation
self.device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
# Load VLA Model using HF AutoClasses
self.processor = AutoProcessor.from_pretrained(self.openvla_path, trust_remote_code=True)
# self.vla = AutoModelForVision2Seq.from_pretrained(
# self.openvla_path,
# attn_implementation=attn_implementation,
# torch_dtype=torch.bfloat16,
# low_cpu_mem_usage=True,
# trust_remote_code=True,
# ).to(self.device)
print("[*] Loading in 4-Bit Quantization Mode")
self.vla = AutoModelForVision2Seq.from_pretrained(
self.openvla_path,
attn_implementation=attn_implementation,
torch_dtype=torch.float16,
quantization_config=BitsAndBytesConfig(load_in_4bit=True),
low_cpu_mem_usage=True,
trust_remote_code=True,
)
# [Hacky] Load Dataset Statistics from Disk (if passing a path to a fine-tuned model)
if os.path.isdir(self.openvla_path):
with open(Path(self.openvla_path) / "dataset_statistics.json", "r") as f:
self.vla.norm_stats = json.load(f)
def predict_action(self, payload: Dict[str, Any]) -> str:
try:
if double_encode := "encoded" in payload:
# Support cases where `json_numpy` is hard to install, and numpy arrays are "double-encoded" as strings
assert len(payload.keys()) == 1, "Only uses encoded payload!"
payload = json.loads(payload["encoded"])
# Parse payload components
image, instruction = payload["image"], payload["instruction"]
unnorm_key = payload.get("unnorm_key", None)
print('instruction:', instruction)
# Run VLA Inference
prompt = get_openvla_prompt(instruction, self.openvla_path)
# === BFLOAT16 MODE ===
# inputs = self.processor(prompt, Image.fromarray(image).convert("RGB")).to(self.device, dtype=torch.bfloat16)
# === 8-BIT/4-BIT QUANTIZATION MODE ===
inputs = self.processor(prompt, Image.fromarray(image).convert("RGB")).to(self.device, dtype=torch.float16)
action = self.vla.predict_action(**inputs, unnorm_key=unnorm_key, do_sample=False)
print('action:', action)
if double_encode:
return JSONResponse(json_numpy.dumps(action))
else:
return JSONResponse(action)
except: # noqa: E722
logging.error(traceback.format_exc())
logging.warning(
"Your request threw an error; make sure your request complies with the expected format:\n"
"{'image': np.ndarray, 'instruction': str}\n"
"You can optionally an `unnorm_key: str` to specific the dataset statistics you want to use for "
"de-normalizing the output actions."
)
return "error"
def run(self, host: str = "0.0.0.0", port: int = 8000) -> None:
self.app = FastAPI()
self.app.post("/act")(self.predict_action)
uvicorn.run(self.app, host=host, port=port)
@dataclass
class DeployConfig:
# fmt: off
openvla_path: Union[str, Path] = "openvla/openvla-7b" # HF Hub Path (or path to local run directory)
# Server Configuration
host: str = "0.0.0.0" # Host IP Address
port: int = 8000 # Host Port
# fmt: on
@draccus.wrap()
def deploy(cfg: DeployConfig) -> None:
server = OpenVLAServer(cfg.openvla_path)
server.run(cfg.host, port=cfg.port)
if __name__ == "__main__":
deploy()
주요변화는 4-Bit 양자화 모드를 사용하였다는 것이다.
다음으로 root 위치에 gradio.py 파일을 만들고 아래와 같은 코드를 넣어준다.
import gradio as gr
import requests
import json_numpy
import numpy as np
from PIL import Image
# Gradio 클라이언트와 서버 간 데이터 포맷 처리
json_numpy.patch()
# REST API 서버 엔드포인트
API_URL = "http://localhost:8000/act"
def predict_action(image, instruction, unnorm_key=None):
# 업로드된 이미지를 numpy 배열로 변환
image_array = np.array(image)
# 요청 데이터(payload) 생성
payload = {
"image": image_array,
"instruction": instruction,
}
if unnorm_key:
payload["unnorm_key"] = unnorm_key
# 서버에 POST 요청
response = requests.post(API_URL, json=payload)
# 서버 응답 확인
if response.status_code == 200:
return response.json()
else:
return f"Error {response.status_code}: {response.text}"
# Gradio 인터페이스 구성
with gr.Blocks() as demo:
gr.Markdown("# OpenVLA Robot Action Prediction")
gr.Markdown(
"Provide an image of the robot workspace and an instruction to predict the robot's action. "
"You can either upload an image or provide a base64-encoded image via API."
)
with gr.Row():
with gr.Column(scale=1):
instruction_input = gr.Textbox(label="Instruction", placeholder="e.g., Pick up the remote")
unnorm_key_input = gr.Textbox(label="Unnorm Key (Optional)", placeholder="e.g., bridge_orig")
image_input = gr.Image(type="pil", label="Upload Image")
submit_btn = gr.Button("Submit")
with gr.Column(scale=1):
output_action = gr.Textbox(label="Robot Action (X, Y, Z, Roll, Pitch, Yaw)", interactive=False, lines=8)
# 예측 함수 연결
submit_btn.click(
fn=predict_action,
inputs=[image_input, instruction_input, unnorm_key_input],
outputs=[output_action]
)
# 예제 제공
gr.Examples(
examples=[
["Place the red vegetable in the silver pot.", "bridge_orig", "./images/bridge_example.jpeg"],
["Pick up the remote", "bridge_orig", "./images/bridge_orig.jpeg"]
],
inputs=[instruction_input, unnorm_key_input, image_input]
)
demo.launch()
X,Y,Z, Roll, Pitch, Yall, Griper 값이 나왔다.
'로봇,ROS,SLAM' 카테고리의 다른 글
[Genesis Part1] Genesis 알아보고 Ubuntu에서 구동해보기 (0) | 2025.01.01 |
---|---|
OpenVLA Tutorial 3 (0) | 2024.11.23 |
OpenVLA Tutorial 01 (1) | 2024.11.19 |
Issac Sim 설치 and ROS URDF 불러오기 (2) | 2024.11.17 |
[ROS2] ROS2 Humble 설치 및 alias로 bashrc 설정 (0) | 2024.11.17 |
Contents
소중한 공감 감사합니다