import os
import sys
import numpy as np
from tensorflow.keras.preprocessing import image
from tensorflow.python.keras.activations import linear
import requests
from PIL import Image
from tensorflow.keras.preprocessing import image as keras_image
from tensorflow.keras.applications.vgg16 import preprocess_input
def get_examples_data_dir():
"""
Get the path to the examples/data directory regardless of current working directory
Returns:
str: Path to the examples/data directory
"""
# Try to find examples/data directory
current_dir = os.getcwd()
# Check if we're in examples/tutorials/tensorflow
if os.path.basename(current_dir) == 'tensorflow' and \
os.path.basename(os.path.dirname(current_dir)) == 'tutorials':
return os.path.join(os.path.dirname(os.path.dirname(current_dir)), 'data')
# Check if we're in examples/tutorials
if os.path.basename(current_dir) == 'tutorials' and \
os.path.basename(os.path.dirname(current_dir)) == 'examples':
return os.path.join(os.path.dirname(current_dir), 'data')
# Check if we're in examples
if os.path.basename(current_dir) == 'examples':
return os.path.join(current_dir, 'data')
# Check if we're in project root
if os.path.exists(os.path.join(current_dir, 'examples', 'data')):
return os.path.join(current_dir, 'examples', 'data')
# Last resort: try to find examples/data relative to script location
script_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
examples_data_dir = os.path.join(script_dir, 'examples', 'data')
if os.path.exists(examples_data_dir):
return examples_data_dir
# If we couldn't find it, create it in the current directory
os.makedirs(os.path.join(current_dir, 'data'), exist_ok=True)
return os.path.join(current_dir, 'data')
[docs]
def remove_softmax(model):
# Remove last layer's softmax
model.layers[-1].activation = linear
return model
[docs]
def calculate_explanation_innvestigate(model, x, method='lrp.epsilon', neuron_selection=None, batchmode=False,
**kwargs):
"""
Calculate an explanation using the innvestigate backend
Args:
model: TensorFlow/Keras model
x: Input to explain
method: Name of the method to use
neuron_selection: Index of the neuron to explain (default: None)
batchmode: Whether to process a batch of inputs
**kwargs: Additional arguments for the method
Returns:
Explanation (relevance map)
"""
# Import here to avoid circular imports
from signxai.tf_signxai.methods.innvestigate.analyzer import create_analyzer
# Create the analyzer
analyzer = create_analyzer(method, model, **kwargs)
# Ensure neuron_selection is a valid format that innvestigate accepts
# Valid values: None, 'all', 'max_activation', <int>, <list>, <one-dimensional array>
if neuron_selection is not None:
# Convert to native Python int to ensure compatibility
try:
if hasattr(neuron_selection, 'numpy'):
# It's a TensorFlow tensor
neuron_selection = int(neuron_selection.numpy())
print(f" DEBUG: Converted tensor to integer: {neuron_selection}")
elif isinstance(neuron_selection, np.ndarray):
# It's a numpy array
neuron_selection = int(neuron_selection.item())
print(f" DEBUG: Converted numpy array to integer: {neuron_selection}")
elif isinstance(neuron_selection, (int, np.integer)):
# Ensure it's a native Python int, not numpy int
neuron_selection = int(neuron_selection)
print(f" DEBUG: Using neuron selection as native Python int: {neuron_selection}")
else:
# Try a simple cast
neuron_selection = int(neuron_selection)
print(f" DEBUG: Forced cast to integer: {neuron_selection}")
except:
# If conversion fails, use 'max_activation'
print(f" DEBUG: Could not convert neuron_selection '{neuron_selection}', using 'max_activation'")
neuron_selection = 'max_activation'
else:
# Default behavior for None
neuron_selection = 'max_activation'
print(f" DEBUG: Using default neuron selection: 'max_activation'")
# Make sure x is a numpy array
if not isinstance(x, np.ndarray):
x = np.array(x)
# Validate input shape for iNNvestigate
if x.ndim < 2 or x.ndim > 4:
raise ValueError(f"Invalid input dimensions for iNNvestigate: {x.ndim}D. Expected 2D-4D.")
# For 4D inputs, ensure they have the expected structure (batch, height, width, channels)
if x.ndim == 4 and x.shape[0] != 1:
print(f" WARNING: Unexpected batch size {x.shape[0]}, taking first sample only")
x = x[0:1] # Take only first sample
print(f" DEBUG: Input to analyzer: shape={x.shape}, ndim={x.ndim}, dtype={x.dtype}")
# Use similar format to original implementation
if not batchmode:
try:
# Always use input as-is for iNNvestigate - it should handle batch dimensions correctly
# The input x should already have proper batch dimension (1, H, W, C) from comparison script
ex = analyzer.analyze(X=x, neuron_selection=neuron_selection, **kwargs)
# Handle the returned explanation properly
if isinstance(ex, dict):
# Some analyzers return a dict - get the first value
expl = ex[list(ex.keys())[0]]
# If it has batch dimension and we only have one sample, take the first
if isinstance(expl, np.ndarray) and expl.shape[0] == 1:
expl = expl[0]
elif isinstance(expl, list) and len(expl) == 1:
expl = expl[0]
else:
# Direct array return - handle batch dimension properly
if isinstance(ex, np.ndarray) and ex.shape[0] == 1:
expl = ex[0] # Remove batch dimension for single sample
elif isinstance(ex, list) and len(ex) == 1:
expl = ex[0]
else:
expl = ex
return np.asarray(expl)
except Exception as e:
print(f" DEBUG: First analysis failed: {e}")
# The first attempt failed, likely due to dimension issues
# Don't retry with the same problematic approach - just fail fast
error_message = f"iNNvestigate analysis failed with input shape {x.shape}: {e}"
print(f" DEBUG: {error_message}")
raise ValueError(error_message)
else:
# Batch mode
try:
# For timeseries data in batch mode, shape handling is critical
# Print debug info about input shape
print(f" DEBUG: Batch mode input shape: {x.shape}")
# Handle possibly missing batch dimension
if x.ndim == 3 and x.shape[0] != 1:
# This is already in the right format (probably multiple samples)
x_batch = x
elif x.ndim == 2:
# Single timeseries without channels, add batch and channel dims
x_batch = np.expand_dims(np.expand_dims(x, axis=0), axis=-1)
print(f" DEBUG: Adjusted single timeseries shape to: {x_batch.shape}")
else:
# Keep as is
x_batch = x
# Original approach with adjusted input
ex = analyzer.analyze(X=x_batch, neuron_selection=neuron_selection, **kwargs)
# Return all examples
if isinstance(ex, dict):
expl = ex[list(ex.keys())[0]]
else:
expl = ex
return np.asarray(expl)
except Exception as e:
error_message = f"Error in innvestigate batch analysis: {e}. Input shape: {x.shape}"
print(f" DEBUG: {error_message}")
raise ValueError(error_message)
def load_image(img_path, target_size=(224, 224), expand_dims=False, use_original_preprocessing=True):
"""
Load an image from a file path and preprocess it for VGG16.
Args:
img_path: Path to the image file
target_size: Size to resize the image to (default: (224, 224))
expand_dims: Whether to add a batch dimension
use_original_preprocessing: If True, use the original preprocessing from SIGN-XAI
Returns:
Tuple of (original image, preprocessed image)
"""
# Load image
img = Image.open(img_path)
img = img.resize(target_size)
# Preprocess image for the network
x = keras_image.img_to_array(img)
if use_original_preprocessing:
# This is the original preprocessing from SIGN-XAI
# Ensure we use float32 for consistency
x = x.astype(np.float32)
# 'RGB'->'BGR' - Create a copy to avoid stride issues
x = x.copy()
# Swap the R and B channels manually
r_channel = x[..., 0].copy()
b_channel = x[..., 2].copy()
x[..., 0] = b_channel
x[..., 2] = r_channel
# Zero-centering based on ImageNet mean RGB values
mean = [103.939, 116.779, 123.68]
x[..., 0] -= mean[0]
x[..., 1] -= mean[1]
x[..., 2] -= mean[2]
else:
# Use TensorFlow's built-in preprocessing
if expand_dims:
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
if not expand_dims:
x = x[0] # Remove batch dimension if not needed
else:
x_expanded = np.expand_dims(x, axis=0)
x_processed = preprocess_input(x_expanded)
x = x_processed[0]
# Add batch dimension if requested (for original preprocessing)
if expand_dims and use_original_preprocessing:
x = np.expand_dims(x, axis=0)
return img, x
def download_image(path):
"""
Download example image if it doesn't exist
Args:
path: Path to save the image
"""
if not os.path.exists(path):
# Create directory if it doesn't exist
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
# Example image URL
url = "https://raw.githubusercontent.com/nilsgumpfer/SIGN-experiment-resources/main/example.jpg"
response = requests.get(url)
with open(path, 'wb') as f:
f.write(response.content)
def download_model(path):
"""
Download example model if it doesn't exist
Args:
path: Path to save the model
"""
if not os.path.exists(path):
# Create directory if it doesn't exist
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
# Example model URL
url = "https://raw.githubusercontent.com/nilsgumpfer/SIGN-experiment-resources/main/model.h5"
response = requests.get(url)
with open(path, 'wb') as f:
f.write(response.content)
def aggregate_and_normalize_relevancemap_rgb(relevancemap):
"""
Aggregate and normalize a RGB relevance map
Args:
relevancemap: RGB relevance map
Returns:
Normalized relevance map
"""
# Aggregate channels
if relevancemap.ndim == 3:
relevancemap = np.sum(relevancemap, axis=2)
return normalize_heatmap(relevancemap)
def normalize_heatmap(heatmap):
"""
Normalize a heatmap to the range [-1, 1]
Args:
heatmap: Heatmap to normalize
Returns:
Normalized heatmap
"""
if heatmap.min() != heatmap.max():
max_abs = np.max(np.abs(heatmap))
if max_abs > 0:
heatmap = heatmap / max_abs
return np.nan_to_num(heatmap, nan=0)
else:
return np.zeros_like(heatmap)