Source code for pyspark_util.columns

import re
from functools import reduce
import pyspark.sql.functions as F


def _validate_columns(df, columns):
    """
    Raises a ValueError if the given dataframe doesn't contains columns with the given names.

    Parameters
    ----------
    df : dataframe
        dataframe to check.
    columns : list of str
        columns to check.

    Returns
    -------
    None

    Raises
    ------
    ValueError
        If the given dataframe doesn't contain columns with the given names.

    Examples
    --------
    >>> data = [(1, 2, 3)]
    >>> columns = ['a', 'b', 'c']
    >>> df = spark.createDataFrame(data, columns)
    >>> _validate_columns(df, 'a')
    >>> _validate_columns(df, 'd')  # doctest: +ELLIPSIS
    Traceback (most recent call last):
        ...
    ValueError: The given dataframe does not contain ['d'].
    """
    not_found = [c for c in columns if c not in df.columns]
    if len(not_found) > 0:
        raise ValueError('The given dataframe does not contain {}.'.format(not_found))


[docs]def prefix_columns(df, prefix, sep='_', exclude=[]): """ Prefix dataframe columns. Parameters ---------- df : dataframe dataframe to be prefixed. prefix : str string to add before each column. sep : str, default '_' separator to join ``prefix`` and each column with. exclude : list of str, default [] A selection of columns to exclude from being prefixed. Returns ------- dataframe dataframe with prefixed columns. Raises ------ ValueError If ``exclude`` contains columns that don't exist in the given dataframe. Examples -------- >>> data = [(1, 2, 3)] >>> columns = ['a', 'b', 'c'] >>> df = spark.createDataFrame(data, columns) >>> df.show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ | a| b| c| +---+---+---+ | 1| 2| 3| +---+---+---+ >>> psu.prefix_columns(df, 'x').show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ |x_a|x_b|x_c| +---+---+---+ | 1| 2| 3| +---+---+---+ >>> psu.prefix_columns(df, 'x', sep='|').show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ |x|a|x|b|x|c| +---+---+---+ | 1| 2| 3| +---+---+---+ >>> psu.prefix_columns(df, 'x', exclude=['b', 'c']).show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ |x_a| b| c| +---+---+---+ | 1| 2| 3| +---+---+---+ """ _validate_columns(df, exclude) prefixed = [F.col(c).alias(prefix + sep + c) if c not in exclude else c for c in df.columns] return df.select(prefixed)
[docs]def suffix_columns(df, suffix, sep='_', exclude=[]): """ Suffix dataframe columns. Parameters ---------- df : dataframe dataframe to be suffixed. suffix : str string to add after each column. sep : str, default '_' separator to join each column and ``suffix`` with. exclude : list of str, default [] A selection of columns to exclude from being suffixed. Returns ------- dataframe dataframe with suffixed columns. Raises ------ ValueError If ``exclude`` contains columns that don't exist in the given dataframe. Examples -------- >>> data = [(1, 2, 3)] >>> columns = ['a', 'b', 'c'] >>> df = spark.createDataFrame(data, columns) >>> df.show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ | a| b| c| +---+---+---+ | 1| 2| 3| +---+---+---+ >>> psu.suffix_columns(df, 'x').show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ |a_x|b_x|c_x| +---+---+---+ | 1| 2| 3| +---+---+---+ >>> psu.suffix_columns(df, 'x', sep='|').show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ |a|x|b|x|c|x| +---+---+---+ | 1| 2| 3| +---+---+---+ >>> psu.suffix_columns(df, 'x', exclude=['b', 'c']).show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ |a_x| b| c| +---+---+---+ | 1| 2| 3| +---+---+---+ """ _validate_columns(df, exclude) prefixed = [F.col(c).alias(c + sep + suffix) if c not in exclude else c for c in df.columns] return df.select(prefixed)
[docs]def rename_columns(df, mapper): """ Rename dataframe columns. Parameters ---------- df : dataframe dataframe to be renamed. mapper : dict dictionary with old name as keys and new name as values. Returns ------- dataframe dataframe with renamed columns. Raises ------ ValueError If ``mapper`` contains columns that don't exist in the given dataframe. Examples -------- >>> data = [(1, 2, 3)] >>> columns = ['a', 'b', 'c'] >>> df = spark.createDataFrame(data, columns) >>> df.show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ | a| b| c| +---+---+---+ | 1| 2| 3| +---+---+---+ >>> psu.rename_columns(df, {'a': 'x'}).show() # doctest: +NORMALIZE_WHITESPACE +---+---+---+ | x| b| c| +---+---+---+ | 1| 2| 3| +---+---+---+ """ _validate_columns(df, list(mapper.keys())) return reduce(lambda df, item: df.withColumnRenamed(*item), mapper.items(), df)
[docs]def select_columns_regex(df, regex): """ Select columns that match a given regular expression. Parameters ---------- df : dataframe dataframe to be selected from. regex : str regular expression. Returns ------- dataframe dataframe with matched columns Examples -------- >>> data = [(1, 2)] >>> columns = ['abc', '123'] >>> df = spark.createDataFrame(data, columns) >>> df.show() # doctest: +NORMALIZE_WHITESPACE +---+---+ |abc|123| +---+---+ | 1| 2| +---+---+ >>> psu.select_columns_regex(df, r'[a-z]+').show() # doctest: +NORMALIZE_WHITESPACE +---+ |abc| +---+ | 1| +---+ >>> psu.select_columns_regex(df, r'[0-9]+').show() # doctest: +NORMALIZE_WHITESPACE +---+ |123| +---+ | 2| +---+ """ return df.select([c for c in df.columns if re.search(regex, c)])