This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python

Overview

PyJava

This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python. PyJava introduces Apache Arrow as the exchanging data format, this means we can avoid ser/der between Java/Scala and Python which can really speed up the communication efficiency than traditional way.

When you invoke python code in Java/Scala side, PyJava will start some python workers automatically and send the data to python worker, and once they are processed, send them back. The python workers are reused
by default.

The initial code in this lib is from Apache Spark.

Install

Setup python(>= 3.6) Env(Conda is recommended):

pip uninstall pyjava && pip install pyjava

Setup Java env(Maven is recommended):

For Scala 2.11/Spark 2.4.3

<dependency>
    <groupId>tech.mlsqlgroupId>
    <artifactId>pyjava-2.4_2.11artifactId>
    <version>0.3.2version>
dependency>

For Scala 2.12/Spark 3.1.1

<dependency>
    <groupId>tech.mlsqlgroupId>
    <artifactId>pyjava-3.0_2.12artifactId>
    <version>0.3.2version>
dependency>

Build Mannually

Install Build Tool:

pip install mlsql_plugin_tool

Build for Spark 3.1.1:

mlsql_plugin_tool spark311
mvn clean install -DskipTests -Pdisable-java8-doclint -Prelease-sign-artifacts

Build For Spark 2.4.3

mlsql_plugin_tool spark243
mvn clean install -DskipTests -Pdisable-java8-doclint -Prelease-sign-artifacts

Using python code snippet to process data in Java/Scala

With pyjava, you can run any python code in your Java/Scala application.

sourceEnconder.toRow(irow).copy() }.iterator // run the code and get the return result val javaConext = new JavaContext val commonTaskContext = new AppContextImpl(javaConext, batch) val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext) //f.copy(), copy function is required columnarBatchIter.flatMap { batch => batch.rowIterator.asScala }.foreach(f => println(f.copy())) javaConext.markComplete javaConext.close ">
val envs = new util.HashMap[String, String]()
// prepare python environment
envs.put(str(PythonConf.PYTHON_ENV), "source activate dev && export ARROW_PRE_0_15_IPC_FORMAT=1 ")

// describe the data which will be transfered to python 
val sourceSchema = StructType(Seq(StructField("value", StringType)))

val batch = new ArrowPythonRunner(
  Seq(ChainedPythonFunctions(Seq(PythonFunction(
    """
      |import pandas as pd
      |import numpy as np
      |
      |def process():
      |    for item in context.fetch_once_as_rows():
      |        item["value1"] = item["value"] + "_suffix"
      |        yield item
      |
      |context.build_result(process())
    """.stripMargin, envs, "python", "3.6")))), sourceSchema,
  "GMT", Map()
)

// prepare data
val sourceEnconder = RowEncoder.apply(sourceSchema).resolveAndBind()
val newIter = Seq(Row.fromSeq(Seq("a1")), Row.fromSeq(Seq("a2"))).map { irow =>
sourceEnconder.toRow(irow).copy()
}.iterator

// run the code and get the return result
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, batch)
val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)

//f.copy(), copy function is required 
columnarBatchIter.flatMap { batch =>
  batch.rowIterator.asScala
}.foreach(f => println(f.copy()))
javaConext.markComplete
javaConext.close

Using python code snippet to process data in Spark

val enconder = RowEncoder.apply(struct).resolveAndBind() val envs = new util.HashMap[String, String]() envs.put(str(PythonConf.PYTHON_ENV), "source activate streamingpro-spark-2.4.x") val batch = new ArrowPythonRunner( Seq(ChainedPythonFunctions(Seq(PythonFunction( """ |import pandas as pd |import numpy as np |for item in data_manager.fetch_once(): | print(item) |df = pd.DataFrame({'AAA': [4, 5, 6, 7],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]}) |data_manager.set_output([[df['AAA'],df['BBB']]]) """.stripMargin, envs, "python", "3.6")))), struct, timezoneid, Map() ) val newIter = iter.map { irow => enconder.toRow(irow) } val commonTaskContext = new SparkContextImp(TaskContext.get(), batch) val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext) columnarBatchIter.flatMap { batch => batch.rowIterator.asScala.map(_.copy) } } val wow = SparkUtils.internalCreateDataFrame(session, abc, StructType(Seq(StructField("AAA", LongType), StructField("BBB", LongType))), false) wow.show() ">
val session = spark
import session.implicits._
val timezoneid = session.sessionState.conf.sessionLocalTimeZone
val df = session.createDataset[String](Seq("a1", "b1")).toDF("value")
val struct = df.schema
val abc = df.rdd.mapPartitions { iter =>
  val enconder = RowEncoder.apply(struct).resolveAndBind()
  val envs = new util.HashMap[String, String]()
  envs.put(str(PythonConf.PYTHON_ENV), "source activate streamingpro-spark-2.4.x")
  val batch = new ArrowPythonRunner(
    Seq(ChainedPythonFunctions(Seq(PythonFunction(
      """
        |import pandas as pd
        |import numpy as np
        |for item in data_manager.fetch_once():
        |    print(item)
        |df = pd.DataFrame({'AAA': [4, 5, 6, 7],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
        |data_manager.set_output([[df['AAA'],df['BBB']]])
      """.stripMargin, envs, "python", "3.6")))), struct,
    timezoneid, Map()
  )
  val newIter = iter.map { irow =>
    enconder.toRow(irow)
  }
  val commonTaskContext = new SparkContextImp(TaskContext.get(), batch)
  val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)
  columnarBatchIter.flatMap { batch =>
    batch.rowIterator.asScala.map(_.copy)
  }
}

val wow = SparkUtils.internalCreateDataFrame(session, abc, StructType(Seq(StructField("AAA", LongType), StructField("BBB", LongType))), false)
wow.show()

Run Python Project

With Pyjava, you can tell the system where is the python project and which is then entrypoint, then you can run this project in Java/Scala.

"/tmp/data", "tempModelLocalPath" -> "/tmp/model" )) output.foreach(println) ">
import tech.mlsql.arrow.python.runner.PythonProjectRunner

val runner = new PythonProjectRunner("./pyjava/examples/pyproject1", Map())
val output = runner.run(Seq("bash", "-c", "source activate dev && python train.py"), Map(
  "tempDataLocalPath" -> "/tmp/data",
  "tempModelLocalPath" -> "/tmp/model"
))
output.foreach(println)

Example In MLSQL

None Interactive Mode:

!python env "PYTHON_ENV=source activate streamingpro-spark-2.4.x";
!python conf "schema=st(field(a,long),field(b,long))";

select 1 as a as table1;

!python on table1 '''

import pandas as pd
import numpy as np
for item in data_manager.fetch_once():
    print(item)
df = pd.DataFrame({'AAA': [4, 5, 6, 8],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
data_manager.set_output([[df['AAA'],df['BBB']]])

''' named mlsql_temp_table2;

select * from mlsql_temp_table2 as output; 

Interactive Mode:

!python start;

!python env "PYTHON_ENV=source activate streamingpro-spark-2.4.x";
!python env "schema=st(field(a,integer),field(b,integer))";


!python '''
import pandas as pd
import numpy as np
''';

!python  '''
for item in data_manager.fetch_once():
    print(item)
df = pd.DataFrame({'AAA': [4, 5, 6, 8],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
data_manager.set_output([[df['AAA'],df['BBB']]])
''';
!python close;

Using PyJava as Arrow Server/Client

Java Server side:

enconder.toRow(irow) }.iterator val javaConext = new JavaContext val commonTaskContext = new AppContextImpl(javaConext, null) val Array(_, host, port) = socketRunner.serveToStreamWithArrow(newIter, dataSchema, 10, commonTaskContext) println(s"${host}:${port}") Thread.currentThread().join() ">
val socketRunner = new SparkSocketRunner("wow", NetUtils.getHost, "Asia/Harbin")

val dataSchema = StructType(Seq(StructField("value", StringType)))
val enconder = RowEncoder.apply(dataSchema).resolveAndBind()
val newIter = Seq(Row.fromSeq(Seq("a1")), Row.fromSeq(Seq("a2"))).map { irow =>
  enconder.toRow(irow)
}.iterator
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, null)

val Array(_, host, port) = socketRunner.serveToStreamWithArrow(newIter, dataSchema, 10, commonTaskContext)
println(s"${host}:${port}")
Thread.currentThread().join()

Python Client side:

import os
import socket

from pyjava.serializers import \
    ArrowStreamPandasSerializer

out_ser = ArrowStreamPandasSerializer(None, True, True)

out_ser = ArrowStreamPandasSerializer("Asia/Harbin", False, None)
HOST = ""
PORT = -1
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect((HOST, PORT))
    buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
    infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size)
    outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size)
    kk = out_ser.load_stream(infile)
    for item in kk:
        print(item)

Python Server side:

import os

import pandas as pd

os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
from pyjava.api.serve import OnceServer

ddata = pd.DataFrame(data=[[1, 2, 3, 4], [2, 3, 4, 5]])

server = OnceServer("127.0.0.1", 11111, "Asia/Harbin")
server.bind()
server.serve([{'id': 9, 'label': 1}])

Java Client side:

println(enconder.fromRow(i.copy()))) javaConext.close ">
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import tech.mlsql.arrow.python.iapp.{AppContextImpl, JavaContext}
import tech.mlsql.arrow.python.runner.SparkSocketRunner
import tech.mlsql.common.utils.network.NetUtils

val enconder = RowEncoder.apply(StructType(Seq(StructField("a", LongType),StructField("b", LongType)))).resolveAndBind()
val socketRunner = new SparkSocketRunner("wow", NetUtils.getHost, "Asia/Harbin")
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, null)
val iter = socketRunner.readFromStreamWithArrow("127.0.0.1", 11111, commonTaskContext)
iter.foreach(i => println(enconder.fromRow(i.copy())))
javaConext.close

How to configure python worker runs in Docker (todo)

Owner
Byzer
Let data speak.
Byzer
A subleq VM/interpreter created by me for no reason

What is Dumbleq? Dumbleq is a dumb Subleq VM/interpreter implementation created by me for absolutely no reason at all. What is Subleq? If you haven't

Phu Minh 2 Nov 13, 2022
Craxk is a SINGLE AND NON-REPLICABLE Hash that uses data from the hardware where it is executed to form a hash that can only be reproduced by a single machine.

What is Craxk ? Craxk is a UNIQUE AND NON-REPLICABLE Hash that uses data from the hardware where it is executed to form a hash that can only be reprod

5 Jun 19, 2021
Contains a Jupyter Notebook for calculating remaining plants required based on field/lathhouse data.

Davis-Sunflowers-Su21 Project goals: Plants influence their reproduction and mating system in many ways. Various factors such as time of flowering, ab

1 Feb 10, 2022
Vita Specific Patches and Application for Doki Doki Literature Club (Steam Version) using Ren'Py PSVita

Doki-Doki-Literature-Club-Vita Vita Specific Patches and Application for Doki Doki Literature Club (Steam Version) using Ren'Py PSVita Contains: Modif

Jaylon Gowie 25 Dec 30, 2022
Python Common things by Problem Fighter Library, (Exception, Debug Log, etc.)

In the name of God, the Most Gracious, the Most Merciful. PF-PY-Common Documentation Install and update using pip: pip install -U xxxx Please find the

Problem Fighter 3 Jan 15, 2022
BridgeWalk is a partially-observed reinforcement learning environment with dynamics of varying stochasticity.

BridgeWalk is a partially-observed reinforcement learning environment with dynamics of varying stochasticity. The player needs to walk along a bridge to reach a goal location. When the player walks o

Danijar Hafner 6 Jun 13, 2022
A topology optimization framework written in Taichi programming language, which is embedded in Python.

Taichi TopOpt (Under Active Development) Intro A topology optimization framework written in Taichi programming language, which is embedded in Python.

Li Zhehao 41 Nov 17, 2022
A python implementation of differentiable quality diversity.

Differentiable Quality Diversity This repository is the official implementation of Differentiable Quality Diversity.

ICAROS 41 Nov 30, 2022
Interactive class notebooks for ECE4076 Computer Vision, weeks 1 - 6

ECE4076 Interactive class notebooks for ECE4076 Computer Vision, weeks 1 - 6. ECE4076 is a computer vision unit at Monash University, covering both cl

Michael Burke 9 Jun 16, 2022
Cylc: a workflow engine for cycling systems

Cylc: a workflow engine for cycling systems. Repository master branch: core meta-scheduler component of cylc-8 (in development); Repository 7.8.x branch: full cylc-7 system.

The Cylc Workflow Engine 205 Dec 20, 2022
Easily Generate Revolut Business Cards

RevBusinessCardGen Easily Generate Revolut Business Cards Prerequisites Before you begin, ensure you have met the following requirements: You have ins

Younes™ 35 Dec 14, 2022
a simple thing that i made for fun :trollface:

we-do-a-little-trolling about a simple thing that i made for fun. requirements and instructions first you need to install obs , then start the virtual

ranon rat 6 Jul 15, 2022
Utility to play with ADCS, allows to request tickets and collect information about related objects

certi Utility to play with ADCS, allows to request tickets and collect information about related objects. Basically, it's the impacket copy of Certify

Eloy 185 Dec 29, 2022
All you need to understand CRUD and MVP in DRF

Book-Store-API This an API which has been put in place just to make you order for books, upload books with price, image and all, pay and automtically

Oladipo Adesiyan 6 Jul 03, 2022
A Guide for Feature Engineering and Feature Selection, with implementations and examples in Python.

Feature Engineering & Feature Selection A comprehensive guide [pdf] [markdown] for Feature Engineering and Feature Selection, with implementations and

Yimeng.Zhang 968 Dec 29, 2022
Back-end API for the reternal framework

RE:TERNAL RE:TERNAL is a centralised purple team simulation platform. Reternal uses agents installed on a simulation network to execute various known

Joey Dreijer 7 Apr 15, 2022
Attempt at creating organized collection of little handy snippets of code I'm receiving along the way

ChaosCode Attempt at creating organized collection of little handy snippets of code I'm receiving along the way I always considered coding and program

INFU 4 Nov 26, 2022
Start and stop your NiceHash miners using this script.

NiceHash Mining Scheduler Use this script to schedule your NiceHash Miner(s). Electricity costs between 4-9pm are high in my area and I want NiceHash

SeaRoth 2 Sep 30, 2022
Blender 2.80+ Timelapse Capture Tool Addon

SimpleTimelapser Blender 2.80+ Timelapse Capture Tool Addon Developed for Blender 3.0.0, tested working on 2.80.0 It's no ZBrush undo history but it's

4 Jan 19, 2022
đź—˝ Like yarn outdated/upgrade, but for pip. Upgrade all your pip packages and automate your Python Dependency Management.

pipupgrade The missing command for pip Table of Contents Features Quick Start Usage Basic Usage Docker Environment Variables FAQ License Features Upda

Achilles Rasquinha 529 Dec 31, 2022