はじめに #
PythonとPandasを使って、時系列データベースInfluxDBを操作する方法についてまとめた。
InfluxDBは時系列データの扱いに特化したデータベースである。概要は以下の記事を参照。 時系列データベースInfluxDB入門
PythonでInfluxDBを操作するには、influx-python
というライブラリを使う。このライブラリは、InfluxDBを開発したinfluxdata社が公開しているものである。influx-python
を使うと、PandasのDataFrameでInfluxDBとデータを授受できる。
環境は以下の通り。
- Linux Mint 19.3 (Cinnamon)
- InfluxDB 1.8.0
また、Pythonのバージョンは以下の通り。
バージョン | |
---|---|
Python | 3.7.6 |
NumPy | 1.18.1 |
Pandas | 1.0.1 |
Influxdb-python | 5.2.2 |
ライブラリのインストール #
PythonからInfluxDBを操作するライブラリinflux-python
をインストールする。
condaの場合:
conda install -c pdrops influxdb
pipの場合:
pip install influxdb
以降では、ライブラリを以下のようにインポートすることを前提とする。
import numpy as np
import pandas as pd
import influxdb
influx-pythonの基本 #
influx-python
でInfluxDBを操作するために、次の2つのクラスが用意されている。
InfluxDBClient
: JSONを使うDataFrameClient
: PandasのDataFrameを使う
本記事では、DataFrameClient
を扱う。
DataFrameClientクラス #
DataFrameClient
クラスの主な引数は以下の通り。
influxdb.DataFrameClient(host=u'localhost', port=8086, username=u'root',
password=u'root', database=None)
host
はアクセス先のInfluxDBがあるPCのIPアドレスである。同じPCの場合はデフォルト値のlocalhost
でよい。
port
はInfluxDBのポート番号である。8086はInfluxDBのデフォルトのポート番号である。
username
, password
はInfluxDBにアクセスするためのユーザ名、パスワードである。‘root’はInfluxDBのデフォルト値と同じである。
database
は接続先のデータベース名である。後ほどメソッド実行時に指定できるので、DataFrameClient
オブジェクト作成時に指定する必要はない。
DataFrameClient
クラスの主なメソッドは次の通り。
メソッド名 | 説明 |
---|---|
create_database(<データベース名>) |
データベースを作成する |
get_list_database() |
データベース一覧を取得する |
get_list_measurements() |
measurement一覧を取得する |
drop_database(<データベース名>) |
データベースを削除する |
drop_measurement(<measurement名>) |
measurementを削除する |
write_points() |
DataFrameを書き込む |
query() |
データを取得する |
write_points()
とquery()
については、以降で詳しく述べる。
InfluxDBへの接続 #
DataFrameClient
クラスを使って、InfluxDBへ接続する。まず、DataFrameClient
オブジェクトを作成し、pd_test
という名前のデータベースを作成する。
client = influxdb.DataFrameClient()
client.create_database("pd_test")
データベースを既に作成している場合は、以下のようにしても良い。
client = influxdb.DataFrameClient(database="pd_test")
DataFrameをInfluxDBに書き込む #
PandasのDataFrameをInfluxDBに書き込むには、write_points
メソッドを用いる。
DataFrameClient.write_points(dataframe, measurement,database=None)
引数の説明は次の通り。
dataframe
: PandasのDataFramemeasurement
: データを追加するmeasurement(表の名前)database
: データベース名。
ここで、DataFrameClient
オブジェクトにデータベースが設定されていない場合、database
は必須である。database
を指定しないと、write_points
メソッド実行時に以下のエラーが出る。
nfluxDBClientError: 400: {"error":"database is required"}
一方、DataFrameClientオブジェクトにデータベースが設定されており、かつwrite_points
メソッドで指定しない場合、オブジェクトに設定されたデータベースにデータが追加される。
例として、PandasのDataFrameを、先ほど作成したpd_test
データベースに追加する。DataFrameは、10行×2列の大きさで、2020年4月1日から1日周期でインデックスを振っている。ここで、measurement名はmeas1
とした。
array = np.arange(20).reshape(-1, 2)
index = pd.date_range(pd.Timestamp("20200401"), freq="1D", periods=10)
df = pd.DataFrame(array, index=index, columns=["A", "B"])
client.write_points(df, "meas1", database="pd_test")
DataFrameをInfluxDBから取得する #
InfluxDBからデータを取得するには、query
メソッドを用いる。
DataFrameClient.query(query, database=None)
引数の説明は以下の通り。
query
: InfluxDBのクエリ文。database
: データベース名。
クエリ文の構文の詳細については、別記事で説明予定。
ここで、DataFrameClient
オブジェクトにデータベースが設定されていない場合、database
は必須である。一方、DataFrameClientオブジェクトにデータベースが設定されており、かつwrite_points
メソッドで指定しない場合、オブジェクトに設定されたデータベースにデータが追加される。
また、query
メソッドの戻り値は辞書型(正確にはdefaultdict)であり、キーがmeasurement, 値がデータフレームになる。
例として、先程write_points
メソッドで書き込んだデータを取得する。
q1 = "SELECT * FROM meas1"
res = client.query(q1, database="pd_test")
print(res)
ここで、クエリ文
SELECT * FROM meas1
は、measurement meas1
の全てのfieldを取得することを表す。
実行結果:queryメソッドの戻り値res
は辞書型であり、キーがmeasurement meas1
, 値がDataFrameとなる。また、DataFrameのindex
はDateTimeIndex
, columns
はInfluxDBのfieldとなる。
defaultdict(<class 'list'>, {'meas1': A B
2020-04-01 00:00:00+00:00 0 1
2020-04-02 00:00:00+00:00 2 3
2020-04-03 00:00:00+00:00 4 5
2020-04-04 00:00:00+00:00 6 7
2020-04-05 00:00:00+00:00 8 9
2020-04-06 00:00:00+00:00 10 11
2020-04-07 00:00:00+00:00 12 13
2020-04-08 00:00:00+00:00 14 15
2020-04-09 00:00:00+00:00 16 17
2020-04-10 00:00:00+00:00 18 19})
なお、DataFrameを直接取得する場合には、
df = list(res.values())[0]
などのようにする。