Elasticsearch tool for easily collecting and batch inserting Python data and pandas DataFrames

Overview

ElasticBatch

Elasticsearch buffer for collecting and batch inserting Python data and pandas DataFrames

Build Status Coverage Status PyPI - Python Version

Overview

ElasticBatch makes it easy to efficiently insert batches of data in the form of Python dictionaries or pandas DataFrames into Elasticsearch. An efficient pattern when processing data bound for Elasticsearch is to collect data records ("documents") in a buffer to be bulk-inserted in batches. ElasticBatch provides this functionality to ease the overhead and reduce the code involved in inserting large batches or streams of data into Elasticsearch.

ElasticBatch has been tested with Elasticsearch 7.x, but should work with earlier versions.

Features

ElasticBatch implements the following features (see Usage for examples and more details) that allow a user to:

  • Work with documents as lists of dicts or as rows of pandas DataFrames
  • Add documents to a buffer that will automatically flush (insert its contents to Elasticsearch) when it is full
  • Interact with an intuitive interface that handles all of the underlying Elasticsearch client logic on behalf of the user
  • Track the elapsed time a document has been in the buffer, allowing a user to flush the buffer at a desired time interval even when it is not full
  • Work within a context manager that will automatically flush before exiting, alleviating the need for extra code to ensure all documents are written to the database
  • Optionally dump the buffer contents (documents) to a file before exiting due to an uncaught exception
  • Automatically add Elasticsearch metadata fields (e.g., _index, _id) to each document via user-supplied functions

Installation

This package is hosted on PyPI and can be installed via pip:

  • To install with the ability to process pandas DataFrames:
    $ pip install elasticbatch[pandas]
    
  • For a more lightweight installation with only the ability to process native Python dicts:
    $ pip install elasticbatch
    

The only dependency of the latter is elasticsearch whereas the former will also install pandas as a dependency.

To instead install from source:

$ git clone https://github.com/dkaslovsky/ElasticBatch.git
$ cd ElasticBatch
$ pip install ".[pandas]"

To install from source without the pandas dependency, replace the last line above with

$ pip install .

Usage

Basic Usage

Start by importing the ElasticBuffer class:

>>> from elasticbatch import ElasticBuffer

ElasticBuffer uses sensible defaults when initialized without parameters:

>>> esbuf = ElasticBuffer()

Alternatively, one can pass any of the following parameters:

  • size: (int) number of documents the buffer can hold before flushing to Elasticsearch; defaults to 5000.
  • client_kwargs: (dict) configuration passed to the underlying elasticsearch.Elasticsearch client; see the Elasticsearch documentation for all available options.
  • bulk_kwargs: (dict) configuration passed to the underlying call to elasticsearch.helpers.bulk for bulk insertion; see the Elasticsearch documentation for all available options.
  • verbose_errs: (bool) whether verbose (True, default) or truncated (False) exceptions are raised; see Exception Handling for more details.
  • dump_dir: (str) directory to write buffer contents when exiting context due to raised Exception; defaults to None for not writing to file.
  • **metadata_funcs: (callable) functions to apply to each document for adding Elasticsearch metadata.; see Automatic Elasticsearch Metadata Fields for more details.

Once initialized, ElasticBuffer exposes two methods, add and flush. Use add to add documents to the buffer, noting that all documents in the buffer will be flushed and inserted into Elasticsearch once the number of documents exceeds the buffer's size:

>>> docs = [
        {'_index': 'my-index', 'a': 1, 'b': 2.1, 'c': 'xyz'},
        {'_index': 'my-index', 'a': 3, 'b': 4.1, 'c': 'xyy'},
        {'_index': 'my-other-index', 'a': 5, 'b': 6.1, 'c': 'zzz'},
        {'_index': 'my-other-index', 'a': 7, 'b': 8.1, 'c': 'zyx'},
    ]
>>> esbuf.add(docs)

Note that all metadata fields required for indexing into Elasticsearch (e.g., _index above) must either be included in each document or added programmatically via callable kwarg parameters supplied to the ElasticBuffer instance (see below).

To manually force a buffer flush and insert all documents to Elasticsearch, use the flush method which does not accept any arguments:

>>> esbuf.flush()

A third method, show(), exists mostly for debug purposes and prints all documents currently in the buffer as newline-delimited json.

pandas DataFrames

One can directly insert a pandas DataFrame into the buffer and each row will be treated as a document:

>>> import pandas as pd
>>> df = pd.DataFrame(docs)
>>> print(df)

           _index  a    b    c
0        my-index  1  2.1  xyz
1        my-index  3  4.1  xyy
2  my-other-index  5  6.1  zzz
3  my-other-index  7  8.1  zyx

>>> esbuf.add(df)

The DataFrame's index (referring to df.index and not the column named _index) is ignored unless it is named, in which case it is added as an ordinary field (column).

Context Manager

ElasticBuffer can also be used as a context manager, offering the advantages of automatically flushing the remaining buffer contents when exiting scope as well as optionally dumping the buffer contents to a file before exiting due to an unhandled exception.

>>> with ElasticBuffer(size=100, dump_dir='/tmp') as esbuf:
       for doc in document_stream:
           doc = process_document(doc)  # some user-defined application-specific processing function
           esbuf.add(doc)

Elapsed Time

When using ElasticBuffer in a service consuming messages from some external source, it can be important to track how long messages have been waiting in the buffer to be flushed. In particular, a user may wish to flush, say, every hour to account for the situation where only a trickle of data is coming in and the buffer is not filling up. ElasticBuffer provides the elapsed time (in seconds) that its oldest message has been in the buffer:

>>> esbuf.oldest_elapsed_time

5.687833070755005  # the oldest message was inserted ~5.69 seconds ago

This information can be used to periodically check the elapsed time of the oldest message and force a flush if it exceeds a desired threshold.

Automatic Elasticsearch Metadata Fields

An ElasticBuffer instance can be initialized with kwargs corresponding to callable functions to add Elasticsearch metadata fields to each document added to the buffer:

>>> def my_index_func(doc): return 'my-index'
>>> def my_id_func(doc): return sum(doc.values())

>>> esbuf = ElasticBuffer(_index=my_index_func, _id=my_id_func)

>>> docs = [
        {'a': 1, 'b': 2},
        {'a': 8, 'b': 9},
    ]
>>> esbuf.add(docs)

>>> esbuf.show()

{"a": 1, "b": 2, "_index": "my-index", "_id": 3}
{"a": 8, "b": 9, "_index": "my-index", "_id": 17}

Callable kwargs add key/value pairs to each document, where the key corresponds to the name of the kwarg and the value is the function's return value. Each function must accept one argument (the document as a dict) and return one value. This also works for DataFrames, as they are transformed to documents (dicts) before applying the supplied metadata functions.

The key/value pairs are added to the top-level of each document. Note that the user need not add documents with data nested under a _source key, as metadata fields can be handled at the same level as the data fields. For further details, see the underlying Elasticsearch client bulk insert documentation on handling of metadata fields in flat dicts.

Exception Handling

For exception handing, ElasticBatch provides the base exception ElasticBatchError:

>>> from elasticbatch import ElasticBatchError

as well as the more specific ElasticBufferFlushError raised on errors flushing to Elasticsearch:

>>> from elasticbatch.exceptions import ElasticBufferFlushError

Elasticsearch exception messages can contain a copy of every document related to a failed bulk insertion request. As such messages can be very large, the verbose_errors flag can be used to optionally truncate the error message. When ElasticBuffer is initialized with verbose_errors=True, the entirety of the error message is returned. When verbose_errors=False, a shorter, descriptive message is returned. In both cases, the full, potentially verbose, exception is available via the err property on the raised ElasticBufferFlushError.

Tests

To run tests:

$ python -m unittest discover -v

The awesome green package is also highly recommended for running tests and reporting test coverage:

$ green -vvr
You might also like...
Finds, downloads, parses, and standardizes public bikeshare data into a standard pandas dataframe format

Finds, downloads, parses, and standardizes public bikeshare data into a standard pandas dataframe format.

A powerful data analysis package based on mathematical step functions.  Strongly aligned with pandas.
A powerful data analysis package based on mathematical step functions. Strongly aligned with pandas.

The leading use-case for the staircase package is for the creation and analysis of step functions. Pretty exciting huh. But don't hit the close button

Used for data processing in machine learning, and help us to construct ML model more easily from scratch

Used for data processing in machine learning, and help us to construct ML model more easily from scratch. Can be used in linear model, logistic regression model, and decision tree.

Calculate multilateral price indices in Python (with Pandas and PySpark).

IndexNumCalc Calculate multilateral price indices using the GEKS-T (CCDI), Time Product Dummy (TPD), Time Dummy Hedonic (TDH), Geary-Khamis (GK) metho

Statistical package in Python based on Pandas
Statistical package in Python based on Pandas

Pingouin is an open-source statistical package written in Python 3 and based mostly on Pandas and NumPy. Some of its main features are listed below. F

Projeto para realizar o RPA Challenge . Utilizando Python e as bibliotecas Selenium e Pandas.
Projeto para realizar o RPA Challenge . Utilizando Python e as bibliotecas Selenium e Pandas.

RPA Challenge in Python Projeto para realizar o RPA Challenge (www.rpachallenge.com), utilizando Python. O objetivo deste desafio é criar um fluxo de

Pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
Pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).

AWS Data Wrangler Pandas on AWS Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretMana

Pandas-based utility to calculate weighted means, medians, distributions, standard deviations, and more.

weightedcalcs weightedcalcs is a pandas-based Python library for calculating weighted means, medians, standard deviations, and more. Features Plays we

Pandas and Dask test helper methods with beautiful error messages.
Pandas and Dask test helper methods with beautiful error messages.

beavis Pandas and Dask test helper methods with beautiful error messages. test helpers These test helper methods are meant to be used in test suites.

Releases(v1.0.0)
Owner
Dan Kaslovsky
Dan Kaslovsky
CaterApp is a cross platform, remotely data sharing tool created for sharing files in a quick and secured manner.

CaterApp is a cross platform, remotely data sharing tool created for sharing files in a quick and secured manner. It is aimed to integrate this tool with several more features including providing a U

Ravi Prakash 3 Jun 27, 2021
Spectral Analysis in Python

SPECTRUM : Spectral Analysis in Python contributions: Please join https://github.com/cokelaer/spectrum contributors: https://github.com/cokelaer/spect

Thomas Cokelaer 280 Dec 16, 2022
Additional tools for particle accelerator data analysis and machine information

PyLHC Tools This package is a collection of useful scripts and tools for the Optics Measurements and Corrections group (OMC) at CERN. Documentation Au

PyLHC 3 Apr 13, 2022
Mortgage-loan-prediction - Show how to perform advanced Analytics and Machine Learning in Python using a full complement of PyData utilities

Mortgage-loan-prediction - Show how to perform advanced Analytics and Machine Learning in Python using a full complement of PyData utilities. This is aimed at those looking to get into the field of D

Joachim 1 Dec 26, 2021
Provide a market analysis (R)

market-study Provide a market analysis (R) - FRENCH Produisez une étude de marché Prérequis Pour effectuer ce projet, vous devrez maîtriser la manipul

1 Feb 13, 2022
Show you how to integrate Zeppelin with Airflow

Introduction This repository is to show you how to integrate Zeppelin with Airflow. The philosophy behind the ingtegration is to make the transition f

Jeff Zhang 11 Dec 30, 2022
Hatchet is a Python-based library that allows Pandas dataframes to be indexed by structured tree and graph data.

Hatchet Hatchet is a Python-based library that allows Pandas dataframes to be indexed by structured tree and graph data. It is intended for analyzing

Lawrence Livermore National Laboratory 14 Aug 19, 2022
Monitor the stability of a pandas or spark dataframe ⚙︎

Population Shift Monitoring popmon is a package that allows one to check the stability of a dataset. popmon works with both pandas and spark datasets.

ING Bank 403 Dec 07, 2022
Tools for working with MARC data in Catalogue Bridge.

catbridge_tools Tools for working with MARC data in Catalogue Bridge. Borrows heavily from PyMarc

1 Nov 11, 2021
Intake is a lightweight package for finding, investigating, loading and disseminating data.

Intake: A general interface for loading data Intake is a lightweight set of tools for loading and sharing data in data science projects. Intake helps

Intake 851 Jan 01, 2023
DaCe is a parallel programming framework that takes code in Python/NumPy and other programming languages

aCe - Data-Centric Parallel Programming Decoupling domain science from performance optimization. DaCe is a parallel programming framework that takes c

SPCL 330 Dec 30, 2022
Candlestick Pattern Recognition with Python and TA-Lib

Candlestick-Pattern-Recognition-with-Python-and-TA-Lib Goal Look at the S&P500 to try and get a better understanding of these candlestick patterns and

Ganesh Jainarain 11 Oct 07, 2022
Using Python to scrape some basic player information from www.premierleague.com and then use Pandas to analyse said data.

PremiershipPlayerAnalysis Using Python to scrape some basic player information from www.premierleague.com and then use Pandas to analyse said data. No

5 Sep 06, 2021
A powerful data analysis package based on mathematical step functions. Strongly aligned with pandas.

The leading use-case for the staircase package is for the creation and analysis of step functions. Pretty exciting huh. But don't hit the close button

48 Dec 21, 2022
ETL flow framework based on Yaml configs in Python

ETL framework based on Yaml configs in Python A light framework for creating data streams. Setting up streams through configuration in the Yaml file.

Павел Максимов 18 Jul 06, 2022
Data Analytics: Modeling and Studying data relating to climate change and adoption of electric vehicles

Correlation-Study-Climate-Change-EV-Adoption Data Analytics: Modeling and Studying data relating to climate change and adoption of electric vehicles I

Jonathan Feng 1 Jan 03, 2022
Airflow ETL With EKS EFS Sagemaker

Airflow ETL With EKS EFS & Sagemaker (en desarrollo) Diagrama de la solución Imp

1 Feb 14, 2022
ELFXtract is an automated analysis tool used for enumerating ELF binaries

ELFXtract ELFXtract is an automated analysis tool used for enumerating ELF binaries Powered by Radare2 and r2ghidra This is specially developed for PW

Monish Kumar 49 Nov 28, 2022
Calculate multilateral price indices in Python (with Pandas and PySpark).

IndexNumCalc Calculate multilateral price indices using the GEKS-T (CCDI), Time Product Dummy (TPD), Time Dummy Hedonic (TDH), Geary-Khamis (GK) metho

Dr. Usman Kayani 3 Apr 27, 2022
An orchestration platform for the development, production, and observation of data assets.

Dagster An orchestration platform for the development, production, and observation of data assets. Dagster lets you define jobs in terms of the data f

Dagster 6.2k Jan 08, 2023