Source code for scfocus.utils

import streamlit as st
import scanpy as sc
import scfocus
import os
import tempfile
from io import BytesIO

[docs] @st.cache_data def preprocess(_adata, n_top_genes): """ Preprocess single-cell RNA-seq data using scanpy. This function performs standard preprocessing steps including count normalization, log transformation, identification of highly variable genes, and PCA. Parameters ---------- _adata : anndata.AnnData Annotated data matrix with cells as observations and genes as variables. Note: Despite the underscore prefix (required by Streamlit caching), this function modifies the AnnData object in place. n_top_genes : int Number of highly variable genes to identify. Notes ----- This function uses Streamlit's caching mechanism to avoid redundant computations. The preprocessing steps are: 1. Total count normalization to 10,000 counts per cell 2. Log transformation (log1p) 3. Highly variable gene identification 4. PCA on highly variable genes """ with st.spinner("Normalizing total counts..."): sc.pp.normalize_total(_adata, target_sum=1e4) st.success("Normalization completed!") with st.spinner("Logarithmizing data..."): sc.pp.log1p(_adata) st.success("Logarithmizing completed!") with st.spinner("Selecting highly variable genes..."): sc.pp.highly_variable_genes(_adata, n_top_genes=int(n_top_genes)) _adata = _adata[:, _adata.var.highly_variable] st.success("Highly variable genes selected!") with st.spinner("Running PCA..."): sc.pp.pca(_adata, mask_var='highly_variable') st.success("PCA completed!")
[docs] @st.cache_data def run_umap(_adata, n_neighbors, min_dist): """ Compute UMAP embedding for single-cell data. Parameters ---------- _adata : anndata.AnnData Preprocessed annotated data matrix. n_neighbors : int Number of neighbors to use in UMAP computation. min_dist : float Minimum distance parameter for UMAP. Returns ------- embedding : numpy.ndarray 2D UMAP embedding coordinates with shape (n_cells, 2). Notes ----- This function first computes the neighborhood graph and then runs UMAP. Results are cached using Streamlit's caching mechanism. """ with st.spinner("Computing neighbors..."): sc.pp.neighbors(_adata, n_neighbors=int(n_neighbors)) with st.spinner("Computing UMAP embedding..."): sc.tl.umap(_adata, min_dist=min_dist) embedding = _adata.obsm['X_umap'].copy() return embedding
[docs] @st.cache_data def run_tsne(_adata, perplexity): """ Compute t-SNE embedding for single-cell data. Parameters ---------- _adata : anndata.AnnData Preprocessed annotated data matrix. perplexity : int Perplexity parameter for t-SNE computation. Returns ------- embedding : numpy.ndarray 2D t-SNE embedding coordinates with shape (n_cells, 2). Notes ----- Results are cached using Streamlit's caching mechanism to avoid redundant computations across different runs. """ with st.spinner("Computing t-SNE embedding..."): sc.tl.tsne(_adata, perplexity=int(perplexity)) st.success("t-SNE completed!") embedding = _adata.obsm['X_tsne'].copy() return embedding
[docs] @st.cache_data def run_focus(_embedding, n=6, pct_samples=.01, meta_focusing=3): """ Run scFocus analysis on embedding data. Parameters ---------- _embedding : numpy.ndarray 2D embedding coordinates (e.g., from UMAP or t-SNE). n : int, optional Number of parallel agents/branches to identify (default: 6). pct_samples : float, optional Percentage of samples to use in each training step (default: 0.01). meta_focusing : int, optional Number of meta-focusing iterations (default: 3). Returns ------- focus_probs : numpy.ndarray Matrix of focus probabilities with shape (n_cells, n_branches). Notes ----- This function creates a scFocus object, performs meta-focusing iterations, merges focus patterns, and returns the final focus probability matrix. Results are cached to avoid redundant computations. """ with st.spinner("Running scFocus analysis..."): focus = scfocus.focus(_embedding, n=n, pct_samples=pct_samples).meta_focusing(n=meta_focusing) focus.merge_fp2() st.success("scFocus analysis completed!") return focus.mfp[0]
[docs] @st.cache_data def read_files(uploaded_files): """ Read uploaded single-cell data files and return an AnnData object. Supports multiple file formats: - Single .h5ad file - 10x Genomics format (matrix.mtx, features.tsv, barcodes.tsv) Parameters ---------- uploaded_files : list List of uploaded file objects from Streamlit file uploader. Returns ------- adata : anndata.AnnData or None Annotated data matrix if successful, None otherwise. Notes ----- For 10x Genomics format, all three required files (matrix, features, barcodes) must be provided. Files can be compressed (.gz) or uncompressed. """ if len(uploaded_files) > 1: mtx_file = next((f for f in uploaded_files if 'matrix' in f.name.lower()), None) features_file = next((f for f in uploaded_files if 'features' in f.name.lower()), None) barcodes_file = next((f for f in uploaded_files if 'barcodes' in f.name.lower()), None) if mtx_file and features_file and barcodes_file: with st.spinner("Loading 10x Genomics data..."): adata = read_10x_files(mtx_file, features_file, barcodes_file) if adata is not None: st.success("10x Genomics files loaded successfully.") st.write(adata) return adata else: st.error( "Please upload all required 10x Genomics files: " "`matrix.mtx`/`matrix.mtx.gz`, `features.tsv`/`features.tsv.gz`, " "and `barcodes.tsv`/`barcodes.tsv.gz`." ) elif len(uploaded_files) == 1: with st.spinner("Loading file..."): adata = read_uploaded_file(uploaded_files[0]) if adata is not None: st.success("File loaded successfully.") st.write(adata) return adata else: st.error("No files uploaded.") return None
[docs] def read_uploaded_file(uploaded_file): """ Read a single uploaded file and return an AnnData object. Parameters ---------- uploaded_file : UploadedFile Uploaded file object from Streamlit. Returns ------- adata : anndata.AnnData or None Annotated data matrix if successful, None otherwise. Notes ----- Currently only supports .h5ad format. Other formats will produce an error message. """ file_type = uploaded_file.name.rsplit('.', 1)[-1].lower() try: if file_type == 'h5ad': return sc.read_h5ad(BytesIO(uploaded_file.read())) else: st.error(f"Unsupported file type: `{file_type}`") return None except Exception as e: st.error(f"Failed to read `{file_type}` file: {e}") return None
[docs] def read_10x_files(mtx_file, features_file, barcodes_file): """ Read 10x Genomics files (compressed or uncompressed) and return an AnnData object. Parameters ---------- mtx_file : UploadedFile Matrix file (matrix.mtx or matrix.mtx.gz). features_file : UploadedFile Features/genes file (features.tsv or features.tsv.gz). barcodes_file : UploadedFile Barcodes file (barcodes.tsv or barcodes.tsv.gz). Returns ------- adata : anndata.AnnData or None Annotated data matrix if successful, None otherwise. Notes ----- Files are temporarily saved to disk for processing with scanpy's read_10x_mtx function. Temporary files are automatically cleaned up after reading. """ try: with tempfile.TemporaryDirectory() as tmpdirname: # Save uploaded files to temporary directory with their original names mtx_path = os.path.join(tmpdirname, mtx_file.name) features_path = os.path.join(tmpdirname, features_file.name) barcodes_path = os.path.join(tmpdirname, barcodes_file.name) with open(mtx_path, 'wb') as f: f.write(mtx_file.read()) with open(features_path, 'wb') as f: f.write(features_file.read()) with open(barcodes_path, 'wb') as f: f.write(barcodes_file.read()) # Check for compressed or uncompressed files required_files = [ 'matrix.mtx', 'matrix.mtx.gz', 'features.tsv', 'features.tsv.gz', 'barcodes.tsv', 'barcodes.tsv.gz' ] temp_files = os.listdir(tmpdirname) for file_variant in required_files: if not any(f.startswith(file_variant.split('.')[0]) for f in temp_files): raise ValueError(f"Missing required file: `{file_variant}`") # Read the data using Scanpy adata = sc.read_10x_mtx(tmpdirname, var_names='gene_symbols', cache=True) return adata except Exception as e: st.error(f"Failed to read 10x Genomics files: {e}") return None