From 8be9e258fc7d1110421aaee451945668cafa23e7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 19 Nov 2025 13:49:22 -0800 Subject: [PATCH] S3: Add tests for PyArrow with native S3 filesystem (#7508) * PyArrow native S3 filesystem * add sse-s3 tests * update * minor * ENABLE_SSE_S3 * Update test_pyarrow_native_s3.py * clean up * refactoring * Update test_pyarrow_native_s3.py --- .github/workflows/s3-parquet-tests.yml | 22 ++ test/s3/parquet/Makefile | 92 ++++- test/s3/parquet/README.md | 87 ++++- test/s3/parquet/example_pyarrow_native.py | 134 +++++++ test/s3/parquet/parquet_test_utils.py | 41 ++ test/s3/parquet/test_pyarrow_native_s3.py | 383 +++++++++++++++++++ test/s3/parquet/test_sse_s3_compatibility.py | 254 ++++++++++++ 7 files changed, 1008 insertions(+), 5 deletions(-) create mode 100755 test/s3/parquet/example_pyarrow_native.py create mode 100644 test/s3/parquet/parquet_test_utils.py create mode 100755 test/s3/parquet/test_pyarrow_native_s3.py create mode 100755 test/s3/parquet/test_sse_s3_compatibility.py diff --git a/.github/workflows/s3-parquet-tests.yml b/.github/workflows/s3-parquet-tests.yml index 8fbd062ef..7c90c984f 100644 --- a/.github/workflows/s3-parquet-tests.yml +++ b/.github/workflows/s3-parquet-tests.yml @@ -86,6 +86,28 @@ jobs: VOLUME_PORT: 8080 MASTER_PORT: 9333 + - name: Run PyArrow native S3 filesystem tests + run: | + cd test/s3/parquet + make test-native-s3-with-server + env: + SEAWEEDFS_BINARY: weed + S3_PORT: 8333 + FILER_PORT: 8888 + VOLUME_PORT: 8080 + MASTER_PORT: 9333 + + - name: Run SSE-S3 encryption compatibility tests + run: | + cd test/s3/parquet + make test-sse-s3-compat + env: + SEAWEEDFS_BINARY: weed + S3_PORT: 8333 + FILER_PORT: 8888 + VOLUME_PORT: 8080 + MASTER_PORT: 9333 + - name: Upload test logs on failure if: failure() uses: actions/upload-artifact@v4 diff --git a/test/s3/parquet/Makefile b/test/s3/parquet/Makefile index dd65b6e9f..bd79d1747 100644 --- a/test/s3/parquet/Makefile +++ b/test/s3/parquet/Makefile @@ -13,6 +13,7 @@ SECRET_KEY ?= some_secret_key1 VOLUME_MAX_SIZE_MB ?= 50 VOLUME_MAX_COUNT ?= 100 BUCKET_NAME ?= test-parquet-bucket +ENABLE_SSE_S3 ?= false # Python configuration PYTHON ?= python3 @@ -29,7 +30,7 @@ GREEN := \033[0;32m YELLOW := \033[1;33m NC := \033[0m # No Color -.PHONY: all build-weed check-binary check-python ci-test clean debug-logs debug-status help manual-start manual-stop setup-python start-seaweedfs start-seaweedfs-ci stop-seaweedfs stop-seaweedfs-safe test test-implicit-dir test-implicit-dir-with-server test-quick test-with-server +.PHONY: all build-weed check-binary check-python ci-test clean debug-logs debug-status help manual-start manual-stop setup-python start-seaweedfs start-seaweedfs-ci stop-seaweedfs stop-seaweedfs-safe test test-implicit-dir test-implicit-dir-with-server test-native-s3 test-native-s3-with-server test-native-s3-with-sse test-quick test-sse-s3-compat test-with-server all: test @@ -48,6 +49,10 @@ help: @echo " test-quick - Run quick tests with small files only (sets TEST_QUICK=1)" @echo " test-implicit-dir - Test implicit directory fix for s3fs compatibility" @echo " test-implicit-dir-with-server - Test implicit directory fix with server management" + @echo " test-native-s3 - Test PyArrow's native S3 filesystem (assumes server running)" + @echo " test-native-s3-with-server - Test PyArrow's native S3 filesystem with server management" + @echo " test-native-s3-with-sse - Test PyArrow's native S3 with SSE-S3 encryption enabled" + @echo " test-sse-s3-compat - Comprehensive SSE-S3 compatibility test (multipart uploads)" @echo " setup-python - Setup Python virtual environment and install dependencies" @echo " check-python - Check if Python and required packages are available" @echo " start-seaweedfs - Start SeaweedFS server for testing" @@ -66,6 +71,7 @@ help: @echo " MASTER_PORT=$(MASTER_PORT)" @echo " BUCKET_NAME=$(BUCKET_NAME)" @echo " VOLUME_MAX_SIZE_MB=$(VOLUME_MAX_SIZE_MB)" + @echo " ENABLE_SSE_S3=$(ENABLE_SSE_S3)" @echo " PYTHON=$(PYTHON)" check-binary: @@ -131,7 +137,13 @@ start-seaweedfs-ci: check-binary # Start filer server with embedded S3 @echo "Starting filer server with embedded S3..." - @printf '{"identities":[{"name":"%s","credentials":[{"accessKey":"%s","secretKey":"%s"}],"actions":["Admin","Read","Write"]}]}' "$(ACCESS_KEY)" "$(ACCESS_KEY)" "$(SECRET_KEY)" > /tmp/seaweedfs-parquet-s3.json + @if [ "$(ENABLE_SSE_S3)" = "true" ]; then \ + echo " SSE-S3 encryption: ENABLED"; \ + printf '{"identities":[{"name":"%s","credentials":[{"accessKey":"%s","secretKey":"%s"}],"actions":["Admin","Read","Write"]}],"buckets":[{"name":"$(BUCKET_NAME)","encryption":{"sseS3":{"enabled":true}}}]}' "$(ACCESS_KEY)" "$(ACCESS_KEY)" "$(SECRET_KEY)" > /tmp/seaweedfs-parquet-s3.json; \ + else \ + echo " SSE-S3 encryption: DISABLED"; \ + printf '{"identities":[{"name":"%s","credentials":[{"accessKey":"%s","secretKey":"%s"}],"actions":["Admin","Read","Write"]}]}' "$(ACCESS_KEY)" "$(ACCESS_KEY)" "$(SECRET_KEY)" > /tmp/seaweedfs-parquet-s3.json; \ + fi @AWS_ACCESS_KEY_ID=$(ACCESS_KEY) AWS_SECRET_ACCESS_KEY=$(SECRET_KEY) nohup $(SEAWEEDFS_BINARY) filer -port=$(FILER_PORT) -port.grpc=$$(( $(FILER_PORT) + 10000 )) -master=127.0.0.1:$(MASTER_PORT) -dataCenter=defaultDataCenter -ip=127.0.0.1 -s3 -s3.port=$(S3_PORT) -s3.config=/tmp/seaweedfs-parquet-s3.json > /tmp/seaweedfs-parquet-filer.log 2>&1 & @sleep 5 @@ -274,7 +286,6 @@ test-with-server: build-weed setup-python BUCKET_NAME=$(BUCKET_NAME) \ $(VENV_DIR)/bin/$(PYTHON) $(PYTHON_TEST_SCRIPT) || exit 1; \ echo "āœ… All tests completed successfully"; \ - $(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true; \ else \ echo "āŒ Failed to start SeaweedFS cluster"; \ echo "=== Server startup logs ==="; \ @@ -329,7 +340,6 @@ test-implicit-dir-with-server: build-weed setup-python BUCKET_NAME=test-implicit-dir \ $(VENV_DIR)/bin/$(PYTHON) test_implicit_directory_fix.py || exit 1; \ echo "āœ… All tests completed successfully"; \ - $(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true; \ else \ echo "āŒ Failed to start SeaweedFS cluster"; \ echo "=== Server startup logs ==="; \ @@ -360,6 +370,80 @@ manual-start: start-seaweedfs manual-stop: stop-seaweedfs clean +# Test PyArrow's native S3 filesystem +test-native-s3: setup-python + @echo "$(YELLOW)Running PyArrow native S3 filesystem tests...$(NC)" + @echo "$(YELLOW)Assuming SeaweedFS is already running on localhost:$(S3_PORT)$(NC)" + @S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=$(BUCKET_NAME) \ + $(VENV_DIR)/bin/$(PYTHON) test_pyarrow_native_s3.py + +# Test PyArrow's native S3 filesystem with automatic server management +test-native-s3-with-server: build-weed setup-python + @echo "šŸš€ Starting PyArrow native S3 filesystem tests with automated server management..." + @echo "Starting SeaweedFS cluster..." + @if $(MAKE) start-seaweedfs-ci > weed-test.log 2>&1; then \ + echo "āœ… SeaweedFS cluster started successfully"; \ + echo "Running PyArrow native S3 filesystem tests..."; \ + trap '$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true' EXIT; \ + S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=$(BUCKET_NAME) \ + $(VENV_DIR)/bin/$(PYTHON) test_pyarrow_native_s3.py || exit 1; \ + echo "āœ… All tests completed successfully"; \ + else \ + echo "āŒ Failed to start SeaweedFS cluster"; \ + echo "=== Server startup logs ==="; \ + tail -100 weed-test.log 2>/dev/null || echo "No startup log available"; \ + exit 1; \ + fi + +# Test PyArrow's native S3 filesystem compatibility with SSE-S3 enabled backend +# (For encryption-specific validation, use test-sse-s3-compat) +test-native-s3-with-sse: build-weed setup-python + @echo "šŸš€ Testing PyArrow native S3 compatibility with SSE-S3 enabled backend..." + @echo "Starting SeaweedFS cluster with SSE-S3 enabled..." + @if $(MAKE) start-seaweedfs-ci ENABLE_SSE_S3=true > weed-test-sse.log 2>&1; then \ + echo "āœ… SeaweedFS cluster started successfully with SSE-S3"; \ + echo "Running PyArrow native S3 filesystem tests with SSE-S3..."; \ + trap '$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true' EXIT; \ + S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=$(BUCKET_NAME) \ + $(VENV_DIR)/bin/$(PYTHON) test_pyarrow_native_s3.py || exit 1; \ + echo "āœ… All SSE-S3 tests completed successfully"; \ + else \ + echo "āŒ Failed to start SeaweedFS cluster with SSE-S3"; \ + echo "=== Server startup logs ==="; \ + tail -100 weed-test-sse.log 2>/dev/null || echo "No startup log available"; \ + exit 1; \ + fi + +# Comprehensive SSE-S3 compatibility test +test-sse-s3-compat: build-weed setup-python + @echo "šŸš€ Starting comprehensive SSE-S3 compatibility tests..." + @echo "Starting SeaweedFS cluster with SSE-S3 enabled..." + @if $(MAKE) start-seaweedfs-ci ENABLE_SSE_S3=true > weed-test-sse-compat.log 2>&1; then \ + echo "āœ… SeaweedFS cluster started successfully with SSE-S3"; \ + echo "Running comprehensive SSE-S3 compatibility tests..."; \ + trap '$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true' EXIT; \ + S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=$(BUCKET_NAME) \ + $(VENV_DIR)/bin/$(PYTHON) test_sse_s3_compatibility.py || exit 1; \ + echo "āœ… All SSE-S3 compatibility tests completed successfully"; \ + else \ + echo "āŒ Failed to start SeaweedFS cluster with SSE-S3"; \ + echo "=== Server startup logs ==="; \ + tail -100 weed-test-sse-compat.log 2>/dev/null || echo "No startup log available"; \ + exit 1; \ + fi + # CI/CD targets ci-test: test-with-server diff --git a/test/s3/parquet/README.md b/test/s3/parquet/README.md index 48ce3e6fc..ed65e4cbb 100644 --- a/test/s3/parquet/README.md +++ b/test/s3/parquet/README.md @@ -10,6 +10,22 @@ SeaweedFS implements implicit directory detection to improve compatibility with ## Quick Start +### Running the Example Script + +```bash +# Start SeaweedFS server +make start-seaweedfs-ci + +# Run the example script +python3 example_pyarrow_native.py + +# Or with uv (if available) +uv run example_pyarrow_native.py + +# Stop the server when done +make stop-seaweedfs-safe +``` + ### Running Tests ```bash @@ -25,12 +41,20 @@ make test-quick # Run implicit directory fix tests make test-implicit-dir-with-server +# Run PyArrow native S3 filesystem tests +make test-native-s3-with-server + +# Run SSE-S3 encryption tests +make test-sse-s3-compat + # Clean up make clean ``` ### Using PyArrow with SeaweedFS +#### Option 1: Using s3fs (recommended for compatibility) + ```python import pyarrow as pa import pyarrow.parquet as pq @@ -55,13 +79,55 @@ table = pq.read_table('bucket/dataset', filesystem=fs) # āœ… dataset = pq.ParquetDataset('bucket/dataset', filesystem=fs) # āœ… ``` +#### Option 2: Using PyArrow's native S3 filesystem (pure PyArrow) + +```python +import pyarrow as pa +import pyarrow.parquet as pq +import pyarrow.dataset as pads +import pyarrow.fs as pafs + +# Configure PyArrow's native S3 filesystem +s3 = pafs.S3FileSystem( + access_key='your_access_key', + secret_key='your_secret_key', + endpoint_override='localhost:8333', + scheme='http', + allow_bucket_creation=True, + allow_bucket_deletion=True +) + +# Write dataset +table = pa.table({'id': [1, 2, 3], 'value': ['a', 'b', 'c']}) +pads.write_dataset(table, 'bucket/dataset', filesystem=s3) + +# Read dataset (all methods work!) +table = pq.read_table('bucket/dataset', filesystem=s3) # āœ… +dataset = pq.ParquetDataset('bucket/dataset', filesystem=s3) # āœ… +dataset = pads.dataset('bucket/dataset', filesystem=s3) # āœ… +``` + ## Test Files ### Main Test Suite - **`s3_parquet_test.py`** - Comprehensive PyArrow test suite - Tests 2 write methods Ɨ 5 read methods Ɨ 2 dataset sizes = 20 combinations + - Uses s3fs library for S3 operations - All tests pass with the implicit directory fix āœ… +### PyArrow Native S3 Tests +- **`test_pyarrow_native_s3.py`** - PyArrow's native S3 filesystem tests + - Tests PyArrow's built-in S3FileSystem (pyarrow.fs.S3FileSystem) + - Pure PyArrow solution without s3fs dependency + - Tests 3 read methods Ɨ 2 dataset sizes = 6 scenarios + - All tests pass āœ… + +- **`test_sse_s3_compatibility.py`** - SSE-S3 encryption compatibility tests + - Tests PyArrow native S3 with SSE-S3 server-side encryption + - Tests 5 different file sizes (10 to 500,000 rows) + - Verifies multipart upload encryption works correctly + - All tests pass āœ… + ### Implicit Directory Tests - **`test_implicit_directory_fix.py`** - Specific tests for the implicit directory fix - Tests HEAD request behavior @@ -69,6 +135,12 @@ dataset = pq.ParquetDataset('bucket/dataset', filesystem=fs) # āœ… - Tests PyArrow dataset reading - All 6 tests pass āœ… +### Examples +- **`example_pyarrow_native.py`** - Simple standalone example + - Demonstrates PyArrow's native S3 filesystem usage + - Can be run with `uv run` or regular Python + - Minimal dependencies (pyarrow, boto3) + ### Configuration - **`Makefile`** - Build and test automation - **`requirements.txt`** - Python dependencies (pyarrow, s3fs, boto3) @@ -128,6 +200,9 @@ make test # Run full tests (assumes server is already running) make test-with-server # Run full PyArrow test suite with server (small + large files) make test-quick # Run quick tests with small files only (assumes server is running) make test-implicit-dir-with-server # Run implicit directory tests with server +make test-native-s3 # Run PyArrow native S3 tests (assumes server is running) +make test-native-s3-with-server # Run PyArrow native S3 tests with server management +make test-sse-s3-compat # Run comprehensive SSE-S3 encryption compatibility tests # Server Management make start-seaweedfs-ci # Start SeaweedFS in background (CI mode) @@ -146,10 +221,20 @@ The tests are automatically run in GitHub Actions on every push/PR that affects **Test Matrix**: - Python versions: 3.9, 3.11, 3.12 -- PyArrow integration tests: 20 test combinations +- PyArrow integration tests (s3fs): 20 test combinations +- PyArrow native S3 tests: 6 test scenarios āœ… **NEW** +- SSE-S3 encryption tests: 5 file sizes āœ… **NEW** - Implicit directory fix tests: 6 test scenarios - Go unit tests: 17 test cases +**Test Steps** (run for each Python version): +1. Build SeaweedFS +2. Run PyArrow Parquet integration tests (`make test-with-server`) +3. Run implicit directory fix tests (`make test-implicit-dir-with-server`) +4. Run PyArrow native S3 filesystem tests (`make test-native-s3-with-server`) āœ… **NEW** +5. Run SSE-S3 encryption compatibility tests (`make test-sse-s3-compat`) āœ… **NEW** +6. Run Go unit tests for implicit directory handling + **Triggers**: - Push/PR to master (when `weed/s3api/**` or `weed/filer/**` changes) - Manual trigger via GitHub UI (workflow_dispatch) diff --git a/test/s3/parquet/example_pyarrow_native.py b/test/s3/parquet/example_pyarrow_native.py new file mode 100755 index 000000000..785ce0b45 --- /dev/null +++ b/test/s3/parquet/example_pyarrow_native.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +# /// script +# dependencies = [ +# "pyarrow>=22", +# "boto3>=1.28.0", +# ] +# /// + +""" +Simple example of using PyArrow's native S3 filesystem with SeaweedFS. + +This is a minimal example demonstrating how to write and read Parquet files +using PyArrow's built-in S3FileSystem without any additional dependencies +like s3fs. + +Usage: + # Set environment variables + export S3_ENDPOINT_URL=localhost:8333 + export S3_ACCESS_KEY=some_access_key1 + export S3_SECRET_KEY=some_secret_key1 + export BUCKET_NAME=test-parquet-bucket + + # Run the script + python3 example_pyarrow_native.py + + # Or run with uv (if available) + uv run example_pyarrow_native.py +""" + +import os +import secrets + +import pyarrow as pa +import pyarrow.dataset as pads +import pyarrow.fs as pafs +import pyarrow.parquet as pq + +from parquet_test_utils import create_sample_table + +# Configuration +BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket") +S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "localhost:8333") +S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY", "some_access_key1") +S3_SECRET_KEY = os.getenv("S3_SECRET_KEY", "some_secret_key1") + +# Determine scheme from endpoint +if S3_ENDPOINT_URL.startswith("http://"): + scheme = "http" + endpoint = S3_ENDPOINT_URL[7:] +elif S3_ENDPOINT_URL.startswith("https://"): + scheme = "https" + endpoint = S3_ENDPOINT_URL[8:] +else: + scheme = "http" # Default to http for localhost + endpoint = S3_ENDPOINT_URL + +print(f"Connecting to S3 endpoint: {scheme}://{endpoint}") + +# Initialize PyArrow's NATIVE S3 filesystem +s3 = pafs.S3FileSystem( + access_key=S3_ACCESS_KEY, + secret_key=S3_SECRET_KEY, + endpoint_override=endpoint, + scheme=scheme, + allow_bucket_creation=True, + allow_bucket_deletion=True, +) + +print("āœ“ Connected to S3 endpoint") + + +# Create bucket if needed (using boto3) +try: + import boto3 + from botocore.exceptions import ClientError + + s3_client = boto3.client( + 's3', + endpoint_url=f"{scheme}://{endpoint}", + aws_access_key_id=S3_ACCESS_KEY, + aws_secret_access_key=S3_SECRET_KEY, + region_name='us-east-1', + ) + + try: + s3_client.head_bucket(Bucket=BUCKET_NAME) + print(f"āœ“ Bucket exists: {BUCKET_NAME}") + except ClientError as e: + if e.response['Error']['Code'] == '404': + print(f"Creating bucket: {BUCKET_NAME}") + s3_client.create_bucket(Bucket=BUCKET_NAME) + print(f"āœ“ Bucket created: {BUCKET_NAME}") + else: + raise +except ImportError: + print("Warning: boto3 not available, assuming bucket exists") + +# Generate a unique filename +filename = f"{BUCKET_NAME}/dataset-{secrets.token_hex(8)}/test.parquet" + +print(f"\nWriting Parquet dataset to: {filename}") + +# Write dataset +table = create_sample_table(200_000) +pads.write_dataset( + table, + filename, + filesystem=s3, + format="parquet", +) + +print(f"āœ“ Wrote {table.num_rows:,} rows") + +# Read with pq.read_table +print("\nReading with pq.read_table...") +table_read = pq.read_table(filename, filesystem=s3) +print(f"āœ“ Read {table_read.num_rows:,} rows") + +# Read with pq.ParquetDataset +print("\nReading with pq.ParquetDataset...") +dataset = pq.ParquetDataset(filename, filesystem=s3) +table_dataset = dataset.read() +print(f"āœ“ Read {table_dataset.num_rows:,} rows") + +# Read with pads.dataset +print("\nReading with pads.dataset...") +dataset_pads = pads.dataset(filename, filesystem=s3) +table_pads = dataset_pads.to_table() +print(f"āœ“ Read {table_pads.num_rows:,} rows") + +print("\nāœ… All operations completed successfully!") +print(f"\nFile written to: {filename}") +print("You can verify the file using the SeaweedFS S3 API or weed shell") + diff --git a/test/s3/parquet/parquet_test_utils.py b/test/s3/parquet/parquet_test_utils.py new file mode 100644 index 000000000..d7e4c43db --- /dev/null +++ b/test/s3/parquet/parquet_test_utils.py @@ -0,0 +1,41 @@ +""" +Shared utility functions for PyArrow Parquet tests. + +This module provides common test utilities used across multiple test scripts +to avoid code duplication and ensure consistency. +""" + +import pyarrow as pa + + +def create_sample_table(num_rows: int = 5) -> pa.Table: + """Create a sample PyArrow table for testing. + + Args: + num_rows: Number of rows to generate (default: 5) + + Returns: + PyArrow Table with test data containing: + - id: int64 sequential IDs (0 to num_rows-1) + - name: string user names (user_0, user_1, ...) + - value: float64 values (id * 1.5) + - flag: bool alternating True/False based on even/odd id + + Example: + >>> table = create_sample_table(3) + >>> print(table) + pyarrow.Table + id: int64 + name: string + value: double + flag: bool + """ + return pa.table( + { + "id": pa.array(range(num_rows), type=pa.int64()), + "name": pa.array([f"user_{i}" for i in range(num_rows)], type=pa.string()), + "value": pa.array([float(i) * 1.5 for i in range(num_rows)], type=pa.float64()), + "flag": pa.array([i % 2 == 0 for i in range(num_rows)], type=pa.bool_()), + } + ) + diff --git a/test/s3/parquet/test_pyarrow_native_s3.py b/test/s3/parquet/test_pyarrow_native_s3.py new file mode 100755 index 000000000..845e50950 --- /dev/null +++ b/test/s3/parquet/test_pyarrow_native_s3.py @@ -0,0 +1,383 @@ +#!/usr/bin/env python3 +""" +Test script for PyArrow's NATIVE S3 filesystem with SeaweedFS. + +This test uses PyArrow's built-in S3FileSystem (pyarrow.fs.S3FileSystem) +instead of s3fs, providing a pure PyArrow solution for reading and writing +Parquet files to S3-compatible storage. + +Requirements: + - pyarrow>=10.0.0 + +Environment Variables: + S3_ENDPOINT_URL: S3 endpoint (default: localhost:8333) + S3_ACCESS_KEY: S3 access key (default: some_access_key1) + S3_SECRET_KEY: S3 secret key (default: some_secret_key1) + BUCKET_NAME: S3 bucket name (default: test-parquet-bucket) + TEST_QUICK: Run only small/quick tests (default: 0, set to 1 for quick mode) + +Usage: + # Run with default environment variables + python3 test_pyarrow_native_s3.py + + # Run with custom environment variables + S3_ENDPOINT_URL=localhost:8333 \ + S3_ACCESS_KEY=mykey \ + S3_SECRET_KEY=mysecret \ + BUCKET_NAME=mybucket \ + python3 test_pyarrow_native_s3.py +""" + +import os +import secrets +import sys +import logging +from typing import Optional + +import pyarrow as pa +import pyarrow.dataset as pads +import pyarrow.fs as pafs +import pyarrow.parquet as pq + +try: + import boto3 + from botocore.exceptions import ClientError + HAS_BOTO3 = True +except ImportError: + HAS_BOTO3 = False + +from parquet_test_utils import create_sample_table + +logging.basicConfig(level=logging.INFO, format="%(message)s") + +# Configuration from environment variables with defaults +S3_ENDPOINT_URL = os.environ.get("S3_ENDPOINT_URL", "localhost:8333") +S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "some_access_key1") +S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "some_secret_key1") +BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket") +TEST_QUICK = os.getenv("TEST_QUICK", "0") == "1" + +# Create randomized test directory +TEST_RUN_ID = secrets.token_hex(8) +TEST_DIR = f"parquet-native-tests/{TEST_RUN_ID}" + +# Test file sizes +TEST_SIZES = { + "small": 5, + "large": 200_000, # This will create multiple row groups +} + +# Filter to only small tests if quick mode is enabled +if TEST_QUICK: + TEST_SIZES = {"small": TEST_SIZES["small"]} + logging.info("Quick test mode enabled - running only small tests") + + +def init_s3_filesystem() -> tuple[Optional[pafs.S3FileSystem], str, str]: + """Initialize PyArrow's native S3 filesystem. + + Returns: + tuple: (S3FileSystem instance, scheme, endpoint) + """ + try: + logging.info("Initializing PyArrow S3FileSystem...") + logging.info(f" Endpoint: {S3_ENDPOINT_URL}") + logging.info(f" Bucket: {BUCKET_NAME}") + + # Determine scheme from endpoint + if S3_ENDPOINT_URL.startswith("http://"): + scheme = "http" + endpoint = S3_ENDPOINT_URL[7:] # Remove http:// + elif S3_ENDPOINT_URL.startswith("https://"): + scheme = "https" + endpoint = S3_ENDPOINT_URL[8:] # Remove https:// + else: + # Default to http for localhost + scheme = "http" + endpoint = S3_ENDPOINT_URL + + # Enable bucket creation and deletion for testing + s3 = pafs.S3FileSystem( + access_key=S3_ACCESS_KEY, + secret_key=S3_SECRET_KEY, + endpoint_override=endpoint, + scheme=scheme, + allow_bucket_creation=True, + allow_bucket_deletion=True, + ) + + logging.info("āœ“ PyArrow S3FileSystem initialized successfully\n") + return s3, scheme, endpoint + except Exception: + logging.exception("āœ— Failed to initialize PyArrow S3FileSystem") + return None, "", "" + + +def ensure_bucket_exists_boto3(scheme: str, endpoint: str) -> bool: + """Ensure the test bucket exists using boto3.""" + if not HAS_BOTO3: + logging.error("boto3 is required for bucket creation") + return False + + try: + # Create boto3 client + endpoint_url = f"{scheme}://{endpoint}" + s3_client = boto3.client( + 's3', + endpoint_url=endpoint_url, + aws_access_key_id=S3_ACCESS_KEY, + aws_secret_access_key=S3_SECRET_KEY, + region_name='us-east-1', + ) + + # Check if bucket exists + try: + s3_client.head_bucket(Bucket=BUCKET_NAME) + logging.info(f"āœ“ Bucket exists: {BUCKET_NAME}") + return True + except ClientError as e: + error_code = e.response['Error']['Code'] + if error_code == '404': + # Bucket doesn't exist, create it + logging.info(f"Creating bucket: {BUCKET_NAME}") + s3_client.create_bucket(Bucket=BUCKET_NAME) + logging.info(f"āœ“ Bucket created: {BUCKET_NAME}") + return True + else: + raise + except Exception: + logging.exception("āœ— Failed to create/check bucket") + return False + + +def ensure_bucket_exists(s3: pafs.S3FileSystem) -> bool: + """Ensure the test bucket exists using PyArrow's native S3FileSystem.""" + try: + # Check if bucket exists by trying to list it + try: + file_info = s3.get_file_info(BUCKET_NAME) + if file_info.type == pafs.FileType.Directory: + logging.info(f"āœ“ Bucket exists: {BUCKET_NAME}") + return True + except OSError as e: + # OSError typically means bucket not found or network/permission issues + error_msg = str(e).lower() + if "not found" in error_msg or "does not exist" in error_msg or "nosuchbucket" in error_msg: + logging.debug(f"Bucket '{BUCKET_NAME}' not found, will attempt creation: {e}") + else: + # Log other OSErrors (network, auth, etc.) for debugging + logging.debug(f"Error checking bucket '{BUCKET_NAME}', will attempt creation anyway: {type(e).__name__}: {e}") + except Exception as e: + # Catch any other unexpected exceptions and log them + logging.debug(f"Unexpected error checking bucket '{BUCKET_NAME}', will attempt creation: {type(e).__name__}: {e}") + + # Try to create the bucket + logging.info(f"Creating bucket: {BUCKET_NAME}") + s3.create_dir(BUCKET_NAME) + logging.info(f"āœ“ Bucket created: {BUCKET_NAME}") + return True + except Exception: + logging.exception(f"āœ— Failed to create/check bucket '{BUCKET_NAME}' with PyArrow") + return False + + +def test_write_and_read(s3: pafs.S3FileSystem, test_name: str, num_rows: int) -> tuple[bool, str]: + """Test writing and reading a Parquet dataset using PyArrow's native S3 filesystem.""" + try: + table = create_sample_table(num_rows) + + # Write using pads.write_dataset + filename = f"{BUCKET_NAME}/{TEST_DIR}/{test_name}/data.parquet" + logging.info(f" Writing {num_rows:,} rows to {filename}...") + + pads.write_dataset( + table, + filename, + filesystem=s3, + format="parquet", + ) + logging.info(" āœ“ Write completed") + + # Test Method 1: Read with pq.read_table + logging.info(" Reading with pq.read_table...") + table_read = pq.read_table(filename, filesystem=s3) + if table_read.num_rows != num_rows: + return False, f"pq.read_table: Row count mismatch (expected {num_rows}, got {table_read.num_rows})" + + # Check schema first + if not table_read.schema.equals(table.schema): + return False, f"pq.read_table: Schema mismatch (expected {table.schema}, got {table_read.schema})" + + # Sort both tables by 'id' column before comparison to handle potential row order differences + table_sorted = table.sort_by([('id', 'ascending')]) + table_read_sorted = table_read.sort_by([('id', 'ascending')]) + + if not table_read_sorted.equals(table_sorted): + # Provide more detailed error information + error_details = [] + for col_name in table.column_names: + col_original = table_sorted.column(col_name) + col_read = table_read_sorted.column(col_name) + if not col_original.equals(col_read): + error_details.append(f"column '{col_name}' differs") + return False, f"pq.read_table: Table contents mismatch ({', '.join(error_details)})" + logging.info(f" āœ“ pq.read_table: {table_read.num_rows:,} rows") + + # Test Method 2: Read with pq.ParquetDataset + logging.info(" Reading with pq.ParquetDataset...") + dataset = pq.ParquetDataset(filename, filesystem=s3) + table_dataset = dataset.read() + if table_dataset.num_rows != num_rows: + return False, f"pq.ParquetDataset: Row count mismatch (expected {num_rows}, got {table_dataset.num_rows})" + + # Sort before comparison + table_dataset_sorted = table_dataset.sort_by([('id', 'ascending')]) + if not table_dataset_sorted.equals(table_sorted): + error_details = [] + for col_name in table.column_names: + col_original = table_sorted.column(col_name) + col_read = table_dataset_sorted.column(col_name) + if not col_original.equals(col_read): + error_details.append(f"column '{col_name}' differs") + return False, f"pq.ParquetDataset: Table contents mismatch ({', '.join(error_details)})" + logging.info(f" āœ“ pq.ParquetDataset: {table_dataset.num_rows:,} rows") + + # Test Method 3: Read with pads.dataset + logging.info(" Reading with pads.dataset...") + dataset_pads = pads.dataset(filename, filesystem=s3) + table_pads = dataset_pads.to_table() + if table_pads.num_rows != num_rows: + return False, f"pads.dataset: Row count mismatch (expected {num_rows}, got {table_pads.num_rows})" + + # Sort before comparison + table_pads_sorted = table_pads.sort_by([('id', 'ascending')]) + if not table_pads_sorted.equals(table_sorted): + error_details = [] + for col_name in table.column_names: + col_original = table_sorted.column(col_name) + col_read = table_pads_sorted.column(col_name) + if not col_original.equals(col_read): + error_details.append(f"column '{col_name}' differs") + return False, f"pads.dataset: Table contents mismatch ({', '.join(error_details)})" + logging.info(f" āœ“ pads.dataset: {table_pads.num_rows:,} rows") + + return True, "All read methods passed" + + except Exception as exc: + logging.exception(" āœ— Test failed") + return False, f"{type(exc).__name__}: {exc}" + + +def cleanup_test_files(s3: pafs.S3FileSystem) -> None: + """Clean up test files from S3. + + Note: We cannot use s3.delete_dir() directly because SeaweedFS uses implicit + directories (path prefixes without physical directory objects). PyArrow's + delete_dir() attempts to delete the directory marker itself, which fails with + "INTERNAL_FAILURE" on SeaweedFS. Instead, we list and delete files individually, + letting implicit directories disappear automatically. + """ + try: + test_path = f"{BUCKET_NAME}/{TEST_DIR}" + logging.info(f"Cleaning up test directory: {test_path}") + + # List and delete files individually to handle implicit directories + try: + file_selector = pafs.FileSelector(test_path, recursive=True) + files = s3.get_file_info(file_selector) + + # Delete files first (not directories) + for file_info in files: + if file_info.type == pafs.FileType.File: + s3.delete_file(file_info.path) + logging.debug(f" Deleted file: {file_info.path}") + + logging.info("āœ“ Test directory cleaned up") + except OSError as e: + # Handle the case where the path doesn't exist or is inaccessible + if "does not exist" in str(e).lower() or "not found" in str(e).lower(): + logging.info("āœ“ Test directory already clean or doesn't exist") + else: + raise + except Exception: + logging.exception("Failed to cleanup test directory") + + +def main(): + """Run all tests with PyArrow's native S3 filesystem.""" + print("=" * 80) + print("PyArrow Native S3 Filesystem Tests for SeaweedFS") + print("Testing Parquet Files with Multiple Row Groups") + if TEST_QUICK: + print("*** QUICK TEST MODE - Small files only ***") + print("=" * 80 + "\n") + + print("Configuration:") + print(f" S3 Endpoint: {S3_ENDPOINT_URL}") + print(f" Access Key: {S3_ACCESS_KEY}") + print(f" Bucket: {BUCKET_NAME}") + print(f" Test Directory: {TEST_DIR}") + print(f" Quick Mode: {'Yes (small files only)' if TEST_QUICK else 'No (all file sizes)'}") + print(f" PyArrow Version: {pa.__version__}") + print() + + # Initialize S3 filesystem + s3, scheme, endpoint = init_s3_filesystem() + if s3 is None: + print("Cannot proceed without S3 connection") + return 1 + + # Ensure bucket exists - try PyArrow first, fall back to boto3 + bucket_created = ensure_bucket_exists(s3) + if not bucket_created: + logging.info("Trying to create bucket with boto3...") + bucket_created = ensure_bucket_exists_boto3(scheme, endpoint) + + if not bucket_created: + print("Cannot proceed without bucket") + return 1 + + results = [] + + # Test all file sizes + for size_name, num_rows in TEST_SIZES.items(): + print(f"\n{'='*80}") + print(f"Testing with {size_name} files ({num_rows:,} rows)") + print(f"{'='*80}\n") + + test_name = f"{size_name}_test" + success, message = test_write_and_read(s3, test_name, num_rows) + results.append((test_name, success, message)) + + status = "āœ“ PASS" if success else "āœ— FAIL" + print(f"\n{status}: {message}\n") + + # Summary + print("\n" + "=" * 80) + print("SUMMARY") + print("=" * 80) + passed = sum(1 for _, success, _ in results if success) + total = len(results) + print(f"\nTotal: {passed}/{total} passed\n") + + for test_name, success, message in results: + status = "āœ“" if success else "āœ—" + print(f" {status} {test_name}: {message}") + + print("\n" + "=" * 80) + if passed == total: + print("āœ“ ALL TESTS PASSED!") + else: + print(f"āœ— {total - passed} test(s) failed") + + print("=" * 80 + "\n") + + # Cleanup + cleanup_test_files(s3) + + return 0 if passed == total else 1 + + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/test/s3/parquet/test_sse_s3_compatibility.py b/test/s3/parquet/test_sse_s3_compatibility.py new file mode 100755 index 000000000..534a6f814 --- /dev/null +++ b/test/s3/parquet/test_sse_s3_compatibility.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +""" +Test script for SSE-S3 compatibility with PyArrow native S3 filesystem. + +This test specifically targets the SSE-S3 multipart upload bug where +SeaweedFS panics with "bad IV length" when reading multipart uploads +that were encrypted with bucket-default SSE-S3. + +Requirements: + - pyarrow>=10.0.0 + - boto3>=1.28.0 + +Environment Variables: + S3_ENDPOINT_URL: S3 endpoint (default: localhost:8333) + S3_ACCESS_KEY: S3 access key (default: some_access_key1) + S3_SECRET_KEY: S3 secret key (default: some_secret_key1) + BUCKET_NAME: S3 bucket name (default: test-parquet-bucket) + +Usage: + # Start SeaweedFS with SSE-S3 enabled + make start-seaweedfs-ci ENABLE_SSE_S3=true + + # Run the test + python3 test_sse_s3_compatibility.py +""" + +import os +import secrets +import sys +import logging +from typing import Optional + +import pyarrow as pa +import pyarrow.dataset as pads +import pyarrow.fs as pafs +import pyarrow.parquet as pq + +try: + import boto3 + from botocore.exceptions import ClientError + HAS_BOTO3 = True +except ImportError: + HAS_BOTO3 = False + logging.exception("boto3 is required for this test") + sys.exit(1) + +from parquet_test_utils import create_sample_table + +logging.basicConfig(level=logging.INFO, format="%(message)s") + +# Configuration +S3_ENDPOINT_URL = os.environ.get("S3_ENDPOINT_URL", "localhost:8333") +S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "some_access_key1") +S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "some_secret_key1") +BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket") + +TEST_RUN_ID = secrets.token_hex(8) +TEST_DIR = f"sse-s3-tests/{TEST_RUN_ID}" + +# Test sizes designed to trigger multipart uploads +# PyArrow typically uses 5MB chunks, so these sizes should trigger multipart +TEST_SIZES = { + "tiny": 10, # Single part + "small": 1_000, # Single part + "medium": 50_000, # Single part (~1.5MB) + "large": 200_000, # Multiple parts (~6MB) + "very_large": 500_000, # Multiple parts (~15MB) +} + + +def init_s3_filesystem() -> tuple[Optional[pafs.S3FileSystem], str, str]: + """Initialize PyArrow's native S3 filesystem.""" + try: + logging.info("Initializing PyArrow S3FileSystem...") + + # Determine scheme from endpoint + if S3_ENDPOINT_URL.startswith("http://"): + scheme = "http" + endpoint = S3_ENDPOINT_URL[7:] + elif S3_ENDPOINT_URL.startswith("https://"): + scheme = "https" + endpoint = S3_ENDPOINT_URL[8:] + else: + scheme = "http" + endpoint = S3_ENDPOINT_URL + + s3 = pafs.S3FileSystem( + access_key=S3_ACCESS_KEY, + secret_key=S3_SECRET_KEY, + endpoint_override=endpoint, + scheme=scheme, + allow_bucket_creation=True, + allow_bucket_deletion=True, + ) + + logging.info("āœ“ PyArrow S3FileSystem initialized\n") + return s3, scheme, endpoint + except Exception: + logging.exception("āœ— Failed to initialize PyArrow S3FileSystem") + return None, "", "" + + +def ensure_bucket_exists(scheme: str, endpoint: str) -> bool: + """Ensure the test bucket exists using boto3.""" + try: + endpoint_url = f"{scheme}://{endpoint}" + s3_client = boto3.client( + 's3', + endpoint_url=endpoint_url, + aws_access_key_id=S3_ACCESS_KEY, + aws_secret_access_key=S3_SECRET_KEY, + region_name='us-east-1', + ) + + try: + s3_client.head_bucket(Bucket=BUCKET_NAME) + logging.info(f"āœ“ Bucket exists: {BUCKET_NAME}") + except ClientError as e: + error_code = e.response['Error']['Code'] + if error_code == '404': + logging.info(f"Creating bucket: {BUCKET_NAME}") + s3_client.create_bucket(Bucket=BUCKET_NAME) + logging.info(f"āœ“ Bucket created: {BUCKET_NAME}") + else: + logging.exception("āœ— Failed to access bucket") + return False + + # Note: SeaweedFS doesn't support GetBucketEncryption API + # so we can't verify if SSE-S3 is enabled via API + # We assume it's configured correctly in the s3.json config file + logging.info("āœ“ Assuming SSE-S3 is configured in s3.json") + return True + + except Exception: + logging.exception("āœ— Failed to check bucket") + return False + + +def test_write_read_with_sse( + s3: pafs.S3FileSystem, + test_name: str, + num_rows: int +) -> tuple[bool, str, int]: + """Test writing and reading with SSE-S3 encryption.""" + try: + table = create_sample_table(num_rows) + filename = f"{BUCKET_NAME}/{TEST_DIR}/{test_name}/data.parquet" + + logging.info(f" Writing {num_rows:,} rows...") + pads.write_dataset( + table, + filename, + filesystem=s3, + format="parquet", + ) + + logging.info(" Reading back...") + table_read = pq.read_table(filename, filesystem=s3) + + if table_read.num_rows != num_rows: + return False, f"Row count mismatch: {table_read.num_rows} != {num_rows}", 0 + + return True, "Success", table_read.num_rows + + except Exception as e: + error_msg = f"{type(e).__name__}: {e!s}" + logging.exception(" āœ— Failed") + return False, error_msg, 0 + + +def main(): + """Run SSE-S3 compatibility tests.""" + print("=" * 80) + print("SSE-S3 Compatibility Tests for PyArrow Native S3") + print("Testing Multipart Upload Encryption") + print("=" * 80 + "\n") + + print("Configuration:") + print(f" S3 Endpoint: {S3_ENDPOINT_URL}") + print(f" Bucket: {BUCKET_NAME}") + print(f" Test Directory: {TEST_DIR}") + print(f" PyArrow Version: {pa.__version__}") + print() + + # Initialize + s3, scheme, endpoint = init_s3_filesystem() + if s3 is None: + print("Cannot proceed without S3 connection") + return 1 + + # Check bucket and SSE-S3 + if not ensure_bucket_exists(scheme, endpoint): + print("\n⚠ WARNING: Failed to access or create the test bucket!") + print("This test requires a reachable bucket with SSE-S3 enabled.") + print("Please ensure SeaweedFS is running with: make start-seaweedfs-ci ENABLE_SSE_S3=true") + return 1 + + print() + results = [] + + # Test all sizes + for size_name, num_rows in TEST_SIZES.items(): + print(f"\n{'='*80}") + print(f"Testing {size_name} dataset ({num_rows:,} rows)") + print(f"{'='*80}") + + success, message, rows_read = test_write_read_with_sse( + s3, size_name, num_rows + ) + results.append((size_name, num_rows, success, message, rows_read)) + + if success: + print(f" āœ“ SUCCESS: Read {rows_read:,} rows") + else: + print(f" āœ— FAILED: {message}") + + # Summary + print("\n" + "=" * 80) + print("SUMMARY") + print("=" * 80) + + passed = sum(1 for _, _, success, _, _ in results if success) + total = len(results) + print(f"\nTotal: {passed}/{total} tests passed\n") + + print(f"{'Size':<15} {'Rows':>10} {'Status':<10} {'Rows Read':>10} {'Message':<40}") + print("-" * 90) + for size_name, num_rows, success, message, rows_read in results: + status = "āœ“ PASS" if success else "āœ— FAIL" + rows_str = f"{rows_read:,}" if success else "N/A" + print(f"{size_name:<15} {num_rows:>10,} {status:<10} {rows_str:>10} {message[:40]}") + + print("\n" + "=" * 80) + if passed == total: + print("āœ“ ALL TESTS PASSED WITH SSE-S3!") + print("\nThis means:") + print(" - SSE-S3 encryption is working correctly") + print(" - PyArrow native S3 filesystem is compatible") + print(" - Multipart uploads are handled properly") + else: + print(f"āœ— {total - passed} test(s) failed") + print("\nPossible issues:") + print(" - SSE-S3 multipart upload bug with empty IV") + print(" - Encryption/decryption mismatch") + print(" - File corruption during upload") + + print("=" * 80 + "\n") + + return 0 if passed == total else 1 + + +if __name__ == "__main__": + sys.exit(main()) +