새소식

로봇,ROS,SLAM

OpenVLA Tutorial 2

  • -

git clone 한 openvla 폴더에서 

 

vla-scripts/deploy.py

 

파일에서 아래와 같이 코드를 바꿔준다.

 

"""
deploy.py

Provide a lightweight server/client implementation for deploying OpenVLA models (through the HF AutoClass API) over a
REST API. This script implements *just* the server, with specific dependencies and instructions below.

Note that for the *client*, usage just requires numpy/json-numpy, and requests; example usage below!

Dependencies:
    => Server (runs OpenVLA model on GPU): `pip install uvicorn fastapi json-numpy`
    => Client: `pip install requests json-numpy`

Client (Standalone) Usage (assuming a server running on 0.0.0.0:8000):

```
import requests
import json_numpy
json_numpy.patch()
import numpy as np

action = requests.post(
    "http://0.0.0.0:8000/act",
    json={"image": np.zeros((256, 256, 3), dtype=np.uint8), "instruction": "do something"}
).json()

Note that if your server is not accessible on the open web, you can use ngrok, or forward ports to your client via ssh:
    => `ssh -L 8000:localhost:8000 ssh USER@<SERVER_IP>`
"""

import os.path

# ruff: noqa: E402
import json_numpy

json_numpy.patch()
import json
import logging
import traceback
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Union

import draccus
import torch
import uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from PIL import Image
from transformers import AutoModelForVision2Seq, AutoProcessor, BitsAndBytesConfig

# === Utilities ===
SYSTEM_PROMPT = (
    "A chat between a curious user and an artificial intelligence assistant. "
    "The assistant gives helpful, detailed, and polite answers to the user's questions."
)


def get_openvla_prompt(instruction: str, openvla_path: Union[str, Path]) -> str:
    if "v01" in openvla_path:
        return f"{SYSTEM_PROMPT} USER: What action should the robot take to {instruction.lower()}? ASSISTANT:"
    else:
        return f"In: What action should the robot take to {instruction.lower()}?\nOut:"


# === Server Interface ===
class OpenVLAServer:
    def __init__(self, openvla_path: Union[str, Path], attn_implementation: Optional[str] = "flash_attention_2") -> Path:
        """
        A simple server for OpenVLA models; exposes `/act` to predict an action for a given image + instruction.
            => Takes in {"image": np.ndarray, "instruction": str, "unnorm_key": Optional[str]}
            => Returns  {"action": np.ndarray}
        """
        self.openvla_path, self.attn_implementation = openvla_path, attn_implementation
        self.device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")

        # Load VLA Model using HF AutoClasses
        self.processor = AutoProcessor.from_pretrained(self.openvla_path, trust_remote_code=True)
        
        # self.vla = AutoModelForVision2Seq.from_pretrained(
        #     self.openvla_path,
        #     attn_implementation=attn_implementation,
        #     torch_dtype=torch.bfloat16,
        #     low_cpu_mem_usage=True,
        #     trust_remote_code=True,
        # ).to(self.device)
        
        print("[*] Loading in 4-Bit Quantization Mode")
        self.vla = AutoModelForVision2Seq.from_pretrained(
            self.openvla_path,
            attn_implementation=attn_implementation,
            torch_dtype=torch.float16,
            quantization_config=BitsAndBytesConfig(load_in_4bit=True),
            low_cpu_mem_usage=True,
            trust_remote_code=True,
        )

        # [Hacky] Load Dataset Statistics from Disk (if passing a path to a fine-tuned model)
        if os.path.isdir(self.openvla_path):
            with open(Path(self.openvla_path) / "dataset_statistics.json", "r") as f:
                self.vla.norm_stats = json.load(f)

    def predict_action(self, payload: Dict[str, Any]) -> str:
        try:
            if double_encode := "encoded" in payload:
                # Support cases where `json_numpy` is hard to install, and numpy arrays are "double-encoded" as strings
                assert len(payload.keys()) == 1, "Only uses encoded payload!"
                payload = json.loads(payload["encoded"])

            # Parse payload components
            image, instruction = payload["image"], payload["instruction"]
            unnorm_key = payload.get("unnorm_key", None)
            print('instruction:', instruction)
            # Run VLA Inference
            prompt = get_openvla_prompt(instruction, self.openvla_path)
            # === BFLOAT16 MODE ===
            # inputs = self.processor(prompt, Image.fromarray(image).convert("RGB")).to(self.device, dtype=torch.bfloat16)
            # === 8-BIT/4-BIT QUANTIZATION MODE ===
            inputs = self.processor(prompt, Image.fromarray(image).convert("RGB")).to(self.device, dtype=torch.float16)
            action = self.vla.predict_action(**inputs, unnorm_key=unnorm_key, do_sample=False)
            
            print('action:', action)
            if double_encode:
                return JSONResponse(json_numpy.dumps(action))
            else:
                return JSONResponse(action)
        except:  # noqa: E722
            logging.error(traceback.format_exc())
            logging.warning(
                "Your request threw an error; make sure your request complies with the expected format:\n"
                "{'image': np.ndarray, 'instruction': str}\n"
                "You can optionally an `unnorm_key: str` to specific the dataset statistics you want to use for "
                "de-normalizing the output actions."
            )
            return "error"

    def run(self, host: str = "0.0.0.0", port: int = 8000) -> None:
        self.app = FastAPI()
        self.app.post("/act")(self.predict_action)
        uvicorn.run(self.app, host=host, port=port)


@dataclass
class DeployConfig:
    # fmt: off
    openvla_path: Union[str, Path] = "openvla/openvla-7b"               # HF Hub Path (or path to local run directory)

    # Server Configuration
    host: str = "0.0.0.0"                                               # Host IP Address
    port: int = 8000                                                    # Host Port

    # fmt: on


@draccus.wrap()
def deploy(cfg: DeployConfig) -> None:
    server = OpenVLAServer(cfg.openvla_path)
    server.run(cfg.host, port=cfg.port)


if __name__ == "__main__":
    deploy()

 

주요변화는 4-Bit 양자화 모드를 사용하였다는 것이다. 

 

다음으로 root 위치에 gradio.py 파일을 만들고 아래와 같은 코드를 넣어준다.

 

import gradio as gr
import requests
import json_numpy
import numpy as np
from PIL import Image

# Gradio 클라이언트와 서버 간 데이터 포맷 처리
json_numpy.patch()

# REST API 서버 엔드포인트
API_URL = "http://localhost:8000/act"

def predict_action(image, instruction, unnorm_key=None):
    # 업로드된 이미지를 numpy 배열로 변환
    image_array = np.array(image)

    # 요청 데이터(payload) 생성
    payload = {
        "image": image_array,
        "instruction": instruction,
    }

    if unnorm_key:
        payload["unnorm_key"] = unnorm_key

    # 서버에 POST 요청
    response = requests.post(API_URL, json=payload)
    
    # 서버 응답 확인
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error {response.status_code}: {response.text}"

# Gradio 인터페이스 구성
with gr.Blocks() as demo:
    gr.Markdown("# OpenVLA Robot Action Prediction")
    gr.Markdown(
        "Provide an image of the robot workspace and an instruction to predict the robot's action. "
        "You can either upload an image or provide a base64-encoded image via API."
    )

    with gr.Row():
        with gr.Column(scale=1):
            instruction_input = gr.Textbox(label="Instruction", placeholder="e.g., Pick up the remote")
            unnorm_key_input = gr.Textbox(label="Unnorm Key (Optional)", placeholder="e.g., bridge_orig")
            image_input = gr.Image(type="pil", label="Upload Image")
            submit_btn = gr.Button("Submit")

        with gr.Column(scale=1):
            output_action = gr.Textbox(label="Robot Action (X, Y, Z, Roll, Pitch, Yaw)", interactive=False, lines=8)
    

    # 예측 함수 연결
    submit_btn.click(
        fn=predict_action,
        inputs=[image_input, instruction_input, unnorm_key_input],
        outputs=[output_action]
    )

    # 예제 제공
    gr.Examples(
        examples=[
            ["Place the red vegetable in the silver pot.", "bridge_orig", "./images/bridge_example.jpeg"],
            ["Pick up the remote", "bridge_orig", "./images/bridge_orig.jpeg"]
        ],
        inputs=[instruction_input, unnorm_key_input, image_input]
    )

demo.launch()

 

 

 

 

 

X,Y,Z, Roll, Pitch, Yall, Griper 값이 나왔다.

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.