Compare commits

...

2 Commits

Author SHA1 Message Date
itqop a2b59b5bf4 Fix pipeline 2024-02-27 22:42:40 +03:00
itqop 1cb9117851 Added pipeline & edit DB_URI to computed_field 2024-02-27 22:37:51 +03:00
3 changed files with 62 additions and 8 deletions

13
app.py
View File

@ -4,7 +4,7 @@ import json
settings = Settings() settings = Settings()
client = MongoDB(settings.DB_URI) client = MongoDB(str(settings.DB_URI))
db = client.client[settings.DATABASE_NAME] db = client.client[settings.DATABASE_NAME]
collection = db[settings.COLLECTION_NAME] collection = db[settings.COLLECTION_NAME]
@ -17,7 +17,14 @@ json_str = '''
''' '''
async def main(json_str): async def main(json_str):
result = await aggregate_salaries() data = json.loads(json_str)
pass dt_from = data["dt_from"]
dt_upto = data["dt_upto"]
group_type = data["group_type"]
result = await aggregate_salaries(collection, dt_from, dt_upto, group_type)
print(result['dataset'], len(result['dataset']))
print(result['labels'], len(result['labels']))
asyncio.run(main(json_str)) asyncio.run(main(json_str))

View File

@ -1,4 +1,7 @@
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import computed_field, MongoDsn
from pydantic_core import Url
class Settings(BaseSettings): class Settings(BaseSettings):
API_TOKEN_TG: str API_TOKEN_TG: str
@ -7,6 +10,11 @@ class Settings(BaseSettings):
COLLECTION_NAME: str COLLECTION_NAME: str
USERNAME_MONGO: str USERNAME_MONGO: str
PASSWORD_MONGO: str PASSWORD_MONGO: str
DB_URI: str
@computed_field
def DB_URI(self) -> MongoDsn:
return Url(
f"mongodb+srv://{self.USERNAME_MONGO}:{self.PASSWORD_MONGO}@{self.HOST_MONGODB}"
)
model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8') model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8')

View File

@ -1,8 +1,47 @@
from typing import List, Tuple from typing import List, Tuple
from motor.motor_asyncio import AsyncIOMotorCollection
import datetime as dt
async def aggregate_salaries(dt_from: str, dt_upto: str, group_type: str) -> Tuple[List[int], List[str]]: async def aggregate_salaries(collection: AsyncIOMotorCollection, dt_from: str, dt_upto: str, group_type: str) -> Tuple[List[int], List[str]]: # type: ignore
group_type_format = "%Y-%m-%dT00:00:00" if group_type == "day" else "%Y-%m-01T00:00:00" if group_type == "month" else "%Y-%m-%dT%H:00:00"
pipeline = [ pipeline = [
{
"$match": {
"dt": {
"$gt": iso(dt_from),
"$lt": iso(dt_upto)
}
}
},
{
"$group": {
"_id": {
"$dateToString":
{
"format": group_type_format,
"date": "$dt"
}
},
"sum": {
"$sum": "$value"
},
}
},
{
"$sort": {
"_id": 1
}
}
] ]
return {"dataset": None, "labels": None} dataset = []
labels = []
async for document in collection.aggregate(pipeline):
dataset.append(document["sum"])
labels.append(document["_id"] )
return {"dataset": dataset, "labels": labels}
def iso(date: str) -> dt.datetime:
return dt.datetime.fromisoformat(date)